Change series and symbol table format

pull/5805/head
Fabian Reinartz 2017-04-25 16:45:44 +02:00
parent da068500c6
commit 433e73f865
4 changed files with 195 additions and 170 deletions

View File

@ -3,27 +3,11 @@
The following describes the format of a single chunks file, which is created in the `chunks/` directory of a block.
```
┌────────────────────────────┬──────────────────┐
│ magic(0x85BD40DD) <4 byte> | version <1 byte>
├────────────────────────────┴──────────────────┤
│ Body ... │
└───────────────────────────────────────────────┘
┌─────────────────────────────┬─────────────────────┐
│ magic(0x85BD40DD) <4 byte> │ version(1) <1 byte>
├─────────────────────────────┴─────────────────────┤
│ ┌──────────────┬───────────────────┬────────┐ │
│ │ len <varint> │ encoding <1 byte> │ data │ ... │
│ └──────────────┴───────────────────┴────────┘ │
└───────────────────────────────────────────────────┘
```
Available versions:
* v1 (`1`)
## Body (v1)
The body contains a sequence of chunks, each of which has the following format.
```
┌─────────────────────────────────────────────────────────┐
│ ┌──────────────┬───────────────────┬────────────┐ │
│ │ len <varint> | encoding <1 byte> │ data │ ... │
│ └──────────────┴───────────────────┴────────────┘ │
└─────────────────────────────────────────────────────────┘
```
The length marks the length of the encoding byte and data combined.
The CRC checksum is calculated over the encoding byte and data.

View File

@ -3,31 +3,25 @@
The following describes the format of the `index` file found in each block directory.
```
┌────────────────────────────┬──────────────────┐
│ magic(0xBAAAD700) <4 byte> │ version <1 byte>
├────────────────────────────┴──────────────────┤
│ Body ... │
└───────────────────────────────────────────────┘
```
## Body (v1)
The body is split into the following parts:
```
┌───────────────────────────────────────────────┐
│ Symbol Table │
├───────────────────────────────────────────────┤
│ Series │
├───────────────────────────────────────────────┤
│ Label Index │
├───────────────────────────────────────────────┤
│ Postings │
├───────────────────────────────────────────────┤
│ Body ... │
├───────────────────────────────────────────────┤
│ Body ... │
└───────────────────────────────────────────────┘
┌────────────────────────────┬─────────────────────┐
│ magic(0xBAAAD700) <4 byte> │ version(1) <1 byte>
├────────────────────────────┴─────────────────────┤
│ ┌──────────────────────────────────────────────┐ │
│ │ Symbol Table │ │
│ ├──────────────────────────────────────────────┤ │
│ │ Series │ │
│ ├──────────────────────────────────────────────┤ │
│ │ Label Index │ │
│ ├──────────────────────────────────────────────┤ │
│ │ Postings │ │
│ ├──────────────────────────────────────────────┤ │
│ │ Body ... │ │
│ ├──────────────────────────────────────────────┤ │
│ │ Body ... │ │
│ ├──────────────────────────────────────────────┤ │
│ │ Body ... │ │
│ └──────────────────────────────────────────────┘ │
└──────────────────────────────────────────────────┘
```
@ -35,15 +29,13 @@ The body is split into the following parts:
The symbol table holds all strings encountered in our index. All other index sections just reference strings in the table as they are highly repetitive.
#### v1, section(`1`)
The section contains a sequence of the raw string data, each prefixed with the string's length.
Strings are referenced by pointing to the beginning of their length field. The strings are sorted in lexicographically ascending order.
```
┌────────────────────────────────────────┐
version <1 byte> │ len <4 byte>
├────────────────────────────────────────┤
┌────────────────────────────────────────┐
count(symbols) <4 byte> │ len <4 byte>
├────────────────────────────────────────┤
│ ┌─────────────────────┬───────────────┐ │
│ │ len(str_1) <varint> │ str_1 <bytes> │ │
│ ├─────────────────────┴───────────────┤ │
@ -52,38 +44,56 @@ Strings are referenced by pointing to the beginning of their length field. The s
│ │ len(str_n) <varint> │ str_1 <bytes> │ │
│ └─────────────────────┴───────────────┘ │
├─────────────────────────────────────────┤
│ CRC <4 byte>
│ CRC32 <4 byte>
└─────────────────────────────────────────┘
```
### Series
#### v1, section(`1`)
The section contains a sequence of series that hold the label set of the series as well as the chunks within the block. The series are sorted lexicographically by their label sets.
The file offset to the beginning of a series serves as the series' ID in all subsequent references. Thereby, a sorted list of series IDs implies a lexicographically sorted list of series label sets.
```
┌────────────┬─────────┬────────────┐
│ series_1 │ . . . │ series_n │
└────────────┴─────────┴────────────┘
┌───────────────────────────────────────┐
│ count(series) <4 byte>
├───────────────────────────────────────┤
│ ┌───────────────────────────────────┐ │
│ │ series_1 │ │
│ ├───────────────────────────────────┤ │
│ │ . . . │ │
│ ├───────────────────────────────────┤ │
│ │ series_n │ │
│ └───────────────────────────────────┘ │
└───────────────────────────────────────┘
```
Every series holds a list of label pairs and chunks. The label pairs reference the symbol table and the chunks an address in one of the block's chunk files.
```
┌──────────────────┬────────────────────────────────────────────────────────────────────────┐
│ │ ┌────────────────────────┬─────────────────────────┐ │
#labels <varint> │ │ ref(l_i.name) <varint> │ ref(l_i.value) <varint> │ ... │
│ │ └────────────────────────┴─────────────────────────┘ │
├──────────────────┼────────────────────────────────────────────────────────────────────────┤
│ │ ┌───────────────────┬───────────────────┬────────────────────────┐ │
#chunks <varint> │ │ c_i.mint <varint> │ c_i.maxt <varint> │ ref(c_i.data) <varint> │ ... │
│ │ └───────────────────┴───────────────────┴────────────────────────┘ │
├──────────────────┴────────────────────────────────────────────────────────────────────────┤
│ CRC32 <4 byte>
└───────────────────────────────────────────────────────────────────────────────────────────┘
┌─────────────────────────────────────────────────────────┐
│ len <varint>
├─────────────────────────────────────────────────────────┤
│ ┌──────────────────┬──────────────────────────────────┐ │
│ │ │ ┌──────────────────────────┐ │ │
│ │ │ │ ref(l_i.name) <varint> │ │ │
│ │ #labels <varint> │ ├──────────────────────────┤ ... │ │
│ │ │ │ ref(l_i.value) <varint> │ │ │
│ │ │ └──────────────────────────┘ │ │
│ ├──────────────────┼──────────────────────────────────┤ │
│ │ │ ┌──────────────────────────┐ │ │
│ │ │ │ c_i.mint <varint> │ │ │
│ │ │ ├──────────────────────────┤ │ │
│ │ │ │ c_i.maxt <varint> │ │ │
│ │ #chunks <varint> │ ├──────────────────────────┤ ... │ │
│ │ │ │ ref(c_i.data) <varint> │ │ │
│ │ │ ├──────────────────────────┤ │ │
│ │ │ │ crc32(c_i.data) <varint> │ │ │
│ │ │ └──────────────────────────┘ │ │
│ └──────────────────┴──────────────────────────────────┘ │
├─────────────────────────────────────────────────────────┤
│ CRC32 <4 byte>
└─────────────────────────────────────────────────────────┘
```
The CRC checksum is calculated over the series contents of the index concatenated with the data of its chunks (with encoding byte, without length).

View File

@ -17,6 +17,7 @@ import (
"bufio"
"encoding/binary"
"fmt"
"hash"
"io"
"os"
@ -41,6 +42,16 @@ type ChunkMeta struct {
MinTime, MaxTime int64 // time range the data covers
}
func (cm *ChunkMeta) hash(h hash.Hash) error {
if _, err := h.Write([]byte{byte(cm.Chunk.Encoding())}); err != nil {
return err
}
if _, err := h.Write(cm.Chunk.Bytes()); err != nil {
return err
}
return nil
}
// ChunkWriter serializes a time block of chunked series data.
type ChunkWriter interface {
// WriteChunks writes several chunks. The Chunk field of the ChunkMetas
@ -194,14 +205,14 @@ func (w *chunkWriter) WriteChunks(chks ...*ChunkMeta) error {
for _, chk := range chks {
chk.Ref = seq | uint64(w.n)
if err := w.write([]byte{byte(chk.Chunk.Encoding())}); err != nil {
return err
}
n = binary.PutUvarint(b, uint64(len(chk.Chunk.Bytes())))
if err := w.write(b[:n]); err != nil {
return err
}
if err := w.write([]byte{byte(chk.Chunk.Encoding())}); err != nil {
return err
}
if err := w.write(chk.Chunk.Bytes()); err != nil {
return err
}
@ -283,17 +294,16 @@ func (s *chunkReader) Chunk(ref uint64) (chunks.Chunk, error) {
if int(off) >= len(b) {
return nil, errors.Errorf("offset %d beyond data size %d", off, len(b))
}
enc := chunks.Encoding(b[off])
b = b[off+1:]
b = b[off:]
l, n := binary.Uvarint(b)
if n < 0 {
return nil, fmt.Errorf("reading chunk length failed")
}
b = b[n:]
enc := chunks.Encoding(b[0])
c, err := chunks.FromData(enc, b[0:l])
c, err := chunks.FromData(enc, b[1:1+l])
if err != nil {
return nil, err
}

207
index.go
View File

@ -35,6 +35,8 @@ const (
MagicIndex = 0xBAAAD700
indexFormatV1 = 1
indexSeriesFormatV1 = 1
)
const compactionPageBytes = minSectorSize * 64
@ -71,8 +73,8 @@ type indexWriterSeries struct {
// serialization format.
type indexWriter struct {
f *os.File
bufw *bufio.Writer
n int64
fbuf *bufio.Writer
pos int
started bool
// Reusable memory.
@ -102,8 +104,8 @@ func newIndexWriter(dir string) (*indexWriter, error) {
iw := &indexWriter{
f: f,
bufw: bufio.NewWriterSize(f, 1<<22),
n: 0,
fbuf: bufio.NewWriterSize(f, 1<<22),
pos: 0,
// Reusable memory.
b: make([]byte, 0, 1<<23),
@ -120,40 +122,19 @@ func newIndexWriter(dir string) (*indexWriter, error) {
return iw, nil
}
func (w *indexWriter) write(wr io.Writer, b []byte) error {
n, err := wr.Write(b)
w.n += int64(n)
func (w *indexWriter) write(b []byte) error {
n, err := w.fbuf.Write(b)
w.pos += n
return err
}
// section writes a CRC32 checksummed section of length l and guarded by flag.
func (w *indexWriter) section(l int, flag byte, f func(w io.Writer) error) error {
w.crc32.Reset()
wr := io.MultiWriter(w.crc32, w.bufw)
b := [5]byte{flag, 0, 0, 0, 0}
binary.BigEndian.PutUint32(b[1:], uint32(l))
if err := w.write(wr, b[:]); err != nil {
return errors.Wrap(err, "writing header")
}
if err := f(wr); err != nil {
return errors.Wrap(err, "write contents")
}
if err := w.write(w.bufw, w.crc32.Sum(nil)); err != nil {
return errors.Wrap(err, "writing checksum")
}
return nil
}
func (w *indexWriter) writeMeta() error {
b := [8]byte{}
b := [5]byte{}
binary.BigEndian.PutUint32(b[:4], MagicIndex)
b[4] = flagStd
return w.write(w.bufw, b[:])
return w.write(b[:])
}
func (w *indexWriter) AddSeries(ref uint32, lset labels.Labels, chunks ...*ChunkMeta) error {
@ -181,24 +162,36 @@ func (w *indexWriter) writeSymbols() error {
}
sort.Strings(symbols)
// The start of the section plus a 5 byte section header are our base.
// TODO(fabxc): switch to relative offsets and hold sections in a TOC.
base := uint32(w.n) + 5
buf := make([]byte, 8)
buf := [binary.MaxVarintLen32]byte{}
w.b = append(w.b[:0], flagStd)
// 8 byte header of symbol count and serialization length.
binary.BigEndian.PutUint32(buf[:4], uint32(len(symbols)))
w.b = w.b[:0]
w.b = append(w.b, buf...)
for _, s := range symbols {
w.symbols[s] = base + uint32(len(w.b))
w.symbols[s] = uint32(w.pos + len(w.b))
n := binary.PutUvarint(buf[:], uint64(len(s)))
n := binary.PutUvarint(buf, uint64(len(s)))
w.b = append(w.b, buf[:n]...)
w.b = append(w.b, s...)
}
return w.section(len(w.b), flagStd, func(wr io.Writer) error {
return w.write(wr, w.b)
})
binary.BigEndian.PutUint32(buf[:4], uint32(len(w.b))-8)
copy(w.b[4:], buf[:4])
w.crc32.Reset()
// Write checksum over contents excluding the 8 byte header.
if _, err := w.crc32.Write(w.b[8:]); err != nil {
return errors.Wrap(err, "calculate symbols CRC32 checksum")
}
w.b = w.crc32.Sum(w.b)
if err := w.write(w.b); err != nil {
return errors.Wrap(err, "write symbols")
}
return nil
}
type indexWriterSeriesSlice []*indexWriterSeries
@ -219,17 +212,16 @@ func (w *indexWriter) writeSeries() error {
}
sort.Sort(series)
// Current end of file plus 5 bytes for section header.
// TODO(fabxc): switch to relative offsets.
base := uint32(w.n) + 5
w.b = w.b[:0]
buf := make([]byte, binary.MaxVarintLen64)
// Header holds number of series.
binary.BigEndian.PutUint32(buf, uint32(len(series)))
if err := w.write(buf[:4]); err != nil {
return errors.Wrap(err, "write series count")
}
for _, s := range series {
// Write label set symbol references.
start := len(w.b)
s.offset = base + uint32(start)
w.b = w.b[:0]
n := binary.PutUvarint(buf, uint64(len(s.labels)))
w.b = append(w.b, buf[:n]...)
@ -253,28 +245,33 @@ func (w *indexWriter) writeSeries() error {
n = binary.PutUvarint(buf, uint64(c.Ref))
w.b = append(w.b, buf[:n]...)
w.crc32.Reset()
if err := c.hash(w.crc32); err != nil {
return errors.Wrap(err, "calculate chunk CRC32")
}
w.b = w.crc32.Sum(w.b)
}
s.offset = uint32(w.pos)
n = binary.PutUvarint(buf, uint64(len(w.b)))
if err := w.write(buf[:n]); err != nil {
return errors.Wrap(err, "write series data size")
}
// Write checksum over series index entry and all its chunk data.
w.crc32.Reset()
w.crc32.Write(w.b[start:])
for _, c := range s.chunks {
fmt.Println(c)
if _, err := w.crc32.Write([]byte{byte(c.Chunk.Encoding())}); err != nil {
return err
}
if _, err := w.crc32.Write(c.Chunk.Bytes()); err != nil {
return err
}
if _, err := w.crc32.Write(w.b); err != nil {
return errors.Wrap(err, "calculate series CRC32")
}
w.b = w.crc32.Sum(w.b)
w.b = append(w.b, w.crc32.Sum(nil)...)
if err := w.write(w.b); err != nil {
return errors.Wrap(err, "write series data")
}
}
return w.section(len(w.b), flagStd, func(wr io.Writer) error {
return w.write(wr, w.b)
})
return nil
}
func (w *indexWriter) init() error {
@ -304,7 +301,7 @@ func (w *indexWriter) WriteLabelIndex(names []string, values []string) error {
w.labelIndexes = append(w.labelIndexes, hashEntry{
name: strings.Join(names, string(sep)),
offset: uint32(w.n),
offset: uint32(w.pos),
})
buf := make([]byte, binary.MaxVarintLen32)
@ -312,21 +309,23 @@ func (w *indexWriter) WriteLabelIndex(names []string, values []string) error {
l := n + len(values)*4
return w.section(l, flagStd, func(wr io.Writer) error {
// First byte indicates tuple size for index.
if err := w.write(wr, buf[:n]); err != nil {
return err
}
w.b = append(w.b[:0], flagStd, 0, 0, 0, 0)
binary.BigEndian.PutUint32(w.b[1:], uint32(l))
for _, v := range valt.s {
binary.BigEndian.PutUint32(buf, w.symbols[v])
w.b = append(w.b, buf[:n]...)
if err := w.write(wr, buf[:4]); err != nil {
return err
}
}
return nil
})
for _, v := range valt.s {
binary.BigEndian.PutUint32(buf, w.symbols[v])
w.b = append(w.b, buf[:4]...)
}
w.crc32.Reset()
if _, err := w.crc32.Write(w.b[5:]); err != nil {
return errors.Wrap(err, "calculate label index CRC32 checksum")
}
w.b = w.crc32.Sum(w.b)
return w.write(w.b)
}
func (w *indexWriter) WritePostings(name, value string, it Postings) error {
@ -340,7 +339,7 @@ func (w *indexWriter) WritePostings(name, value string, it Postings) error {
w.postings = append(w.postings, hashEntry{
name: key,
offset: uint32(w.n),
offset: uint32(w.pos),
})
// Order of the references in the postings list does not imply order
@ -361,7 +360,7 @@ func (w *indexWriter) WritePostings(name, value string, it Postings) error {
sort.Sort(uint32slice(refs))
w.b = w.b[:0]
w.b = append(w.b[:0], flagStd, 0, 0, 0, 0)
buf := make([]byte, 4)
for _, r := range refs {
@ -371,9 +370,15 @@ func (w *indexWriter) WritePostings(name, value string, it Postings) error {
w.uint32s = refs[:0]
return w.section(len(w.b), flagStd, func(wr io.Writer) error {
return w.write(wr, w.b)
})
binary.BigEndian.PutUint32(w.b[1:], uint32(len(w.b)-5))
w.crc32.Reset()
if _, err := w.crc32.Write(w.b[5:]); err != nil {
return errors.Wrap(err, "calculate label index CRC32 checksum")
}
w.b = w.crc32.Sum(w.b)
return w.write(w.b)
}
type uint32slice []uint32
@ -388,7 +393,7 @@ type hashEntry struct {
}
func (w *indexWriter) writeHashmap(h []hashEntry) error {
w.b = w.b[:0]
w.b = append(w.b[:0], flagStd, 0, 0, 0, 0)
buf := [binary.MaxVarintLen32]byte{}
for _, e := range h {
@ -400,19 +405,25 @@ func (w *indexWriter) writeHashmap(h []hashEntry) error {
w.b = append(w.b, buf[:n]...)
}
return w.section(len(w.b), flagStd, func(wr io.Writer) error {
return w.write(wr, w.b)
})
binary.BigEndian.PutUint32(w.b[1:], uint32(len(w.b)-5))
w.crc32.Reset()
if _, err := w.crc32.Write(w.b[5:]); err != nil {
return errors.Wrap(err, "calculate label index CRC32 checksum")
}
w.b = w.crc32.Sum(w.b)
return w.write(w.b)
}
func (w *indexWriter) finalize() error {
// Write out hash maps to jump to correct label index and postings sections.
lo := uint32(w.n)
lo := uint32(w.pos)
if err := w.writeHashmap(w.labelIndexes); err != nil {
return err
}
po := uint32(w.n)
po := uint32(w.pos)
if err := w.writeHashmap(w.postings); err != nil {
return err
}
@ -425,14 +436,14 @@ func (w *indexWriter) finalize() error {
binary.BigEndian.PutUint32(b[:4], lo)
binary.BigEndian.PutUint32(b[4:], po)
return w.write(w.bufw, b[:])
return w.write(b[:])
}
func (w *indexWriter) Close() error {
if err := w.finalize(); err != nil {
return err
}
if err := w.bufw.Flush(); err != nil {
if err := w.fbuf.Flush(); err != nil {
return err
}
if err := fileutil.Fsync(w.f); err != nil {
@ -588,7 +599,7 @@ func (r *indexReader) lookupSymbol(o uint32) (string, error) {
end := int(o) + n + int(l)
if end > len(r.b) {
return "", errors.New("invalid length")
return "", errors.Errorf("invalid length %d", l)
}
b := r.b[int(o)+n : end]
@ -640,12 +651,16 @@ func (r *indexReader) LabelIndices() ([][]string, error) {
}
func (r *indexReader) Series(ref uint32) (labels.Labels, []*ChunkMeta, error) {
k, n := binary.Uvarint(r.b[ref:])
// Read away length of series data.
_, n := binary.Uvarint(r.b[ref:])
b := r.b[int(ref)+n:]
k, n := binary.Uvarint(b)
if n < 1 {
return nil, nil, errors.Wrap(errInvalidSize, "number of labels")
}
b := r.b[int(ref)+n:]
b = b[n:]
lbls := make(labels.Labels, 0, k)
for i := 0; i < 2*int(k); i += 2 {
@ -703,6 +718,9 @@ func (r *indexReader) Series(ref uint32) (labels.Labels, []*ChunkMeta, error) {
}
b = b[n:]
// TODO(fabxc): read and potentially verify checksum.
b = b[4:]
chunks = append(chunks, &ChunkMeta{
Ref: o,
MinTime: firstTime,
@ -710,6 +728,8 @@ func (r *indexReader) Series(ref uint32) (labels.Labels, []*ChunkMeta, error) {
})
}
// TODO(fabxc): read and potentially verify checksum.
return lbls, chunks, nil
}
@ -734,6 +754,7 @@ func (r *indexReader) Postings(name, value string) (Postings, error) {
if len(b)%4 != 0 {
return nil, errors.Wrap(errInvalidSize, "plain postings entry")
}
return newBigEndianPostings(b), nil
}