diff --git a/Documentation/format/chunks.md b/Documentation/format/chunks.md index fdb45384c..d5b71f0b6 100644 --- a/Documentation/format/chunks.md +++ b/Documentation/format/chunks.md @@ -3,27 +3,11 @@ The following describes the format of a single chunks file, which is created in the `chunks/` directory of a block. ``` - ┌────────────────────────────┬──────────────────┐ - │ magic(0x85BD40DD) <4 byte> | version <1 byte> │ - ├────────────────────────────┴──────────────────┤ - │ Body ... │ - └───────────────────────────────────────────────┘ + ┌─────────────────────────────┬─────────────────────┐ + │ magic(0x85BD40DD) <4 byte> │ version(1) <1 byte> │ + ├─────────────────────────────┴─────────────────────┤ + │ ┌──────────────┬───────────────────┬────────┐ │ + │ │ len │ encoding <1 byte> │ data │ ... │ + │ └──────────────┴───────────────────┴────────┘ │ + └───────────────────────────────────────────────────┘ ``` - -Available versions: -* v1 (`1`) - -## Body (v1) - -The body contains a sequence of chunks, each of which has the following format. - -``` -┌─────────────────────────────────────────────────────────┐ -│ ┌──────────────┬───────────────────┬────────────┐ │ -│ │ len | encoding <1 byte> │ data │ ... │ -│ └──────────────┴───────────────────┴────────────┘ │ -└─────────────────────────────────────────────────────────┘ -``` - -The length marks the length of the encoding byte and data combined. -The CRC checksum is calculated over the encoding byte and data. \ No newline at end of file diff --git a/Documentation/format/index.md b/Documentation/format/index.md index 54dc73752..8b0112bef 100644 --- a/Documentation/format/index.md +++ b/Documentation/format/index.md @@ -3,31 +3,25 @@ The following describes the format of the `index` file found in each block directory. ``` - ┌────────────────────────────┬──────────────────┐ - │ magic(0xBAAAD700) <4 byte> │ version <1 byte> │ - ├────────────────────────────┴──────────────────┤ - │ Body ... │ - └───────────────────────────────────────────────┘ -``` - -## Body (v1) - -The body is split into the following parts: - -``` - ┌───────────────────────────────────────────────┐ - │ Symbol Table │ - ├───────────────────────────────────────────────┤ - │ Series │ - ├───────────────────────────────────────────────┤ - │ Label Index │ - ├───────────────────────────────────────────────┤ - │ Postings │ - ├───────────────────────────────────────────────┤ - │ Body ... │ - ├───────────────────────────────────────────────┤ - │ Body ... │ - └───────────────────────────────────────────────┘ + ┌────────────────────────────┬─────────────────────┐ + │ magic(0xBAAAD700) <4 byte> │ version(1) <1 byte> │ + ├────────────────────────────┴─────────────────────┤ + │ ┌──────────────────────────────────────────────┐ │ + │ │ Symbol Table │ │ + │ ├──────────────────────────────────────────────┤ │ + │ │ Series │ │ + │ ├──────────────────────────────────────────────┤ │ + │ │ Label Index │ │ + │ ├──────────────────────────────────────────────┤ │ + │ │ Postings │ │ + │ ├──────────────────────────────────────────────┤ │ + │ │ Body ... │ │ + │ ├──────────────────────────────────────────────┤ │ + │ │ Body ... │ │ + │ ├──────────────────────────────────────────────┤ │ + │ │ Body ... │ │ + │ └──────────────────────────────────────────────┘ │ + └──────────────────────────────────────────────────┘ ``` @@ -35,15 +29,13 @@ The body is split into the following parts: The symbol table holds all strings encountered in our index. All other index sections just reference strings in the table as they are highly repetitive. -#### v1, section(`1`) - The section contains a sequence of the raw string data, each prefixed with the string's length. Strings are referenced by pointing to the beginning of their length field. The strings are sorted in lexicographically ascending order. ``` - ┌────────────────────┬────────────────────┐ - │ version <1 byte> │ len <4 byte> │ - ├────────────────────┴────────────────────┤ + ┌─────────────────────────┬───────────────┐ + │ count(symbols) <4 byte> │ len <4 byte> │ + ├─────────────────────────┴───────────────┤ │ ┌─────────────────────┬───────────────┐ │ │ │ len(str_1) │ str_1 │ │ │ ├─────────────────────┴───────────────┤ │ @@ -52,38 +44,56 @@ Strings are referenced by pointing to the beginning of their length field. The s │ │ len(str_n) │ str_1 │ │ │ └─────────────────────┴───────────────┘ │ ├─────────────────────────────────────────┤ - │ CRC <4 byte> │ + │ CRC32 <4 byte> │ └─────────────────────────────────────────┘ ``` ### Series -#### v1, section(`1`) - The section contains a sequence of series that hold the label set of the series as well as the chunks within the block. The series are sorted lexicographically by their label sets. The file offset to the beginning of a series serves as the series' ID in all subsequent references. Thereby, a sorted list of series IDs implies a lexicographically sorted list of series label sets. ``` - ┌────────────┬─────────┬────────────┐ - │ series_1 │ . . . │ series_n │ - └────────────┴─────────┴────────────┘ + ┌───────────────────────────────────────┐ + │ count(series) <4 byte> │ + ├───────────────────────────────────────┤ + │ ┌───────────────────────────────────┐ │ + │ │ series_1 │ │ + │ ├───────────────────────────────────┤ │ + │ │ . . . │ │ + │ ├───────────────────────────────────┤ │ + │ │ series_n │ │ + │ └───────────────────────────────────┘ │ + └───────────────────────────────────────┘ ``` Every series holds a list of label pairs and chunks. The label pairs reference the symbol table and the chunks an address in one of the block's chunk files. ``` - ┌──────────────────┬────────────────────────────────────────────────────────────────────────┐ - │ │ ┌────────────────────────┬─────────────────────────┐ │ - │ #labels │ │ ref(l_i.name) │ ref(l_i.value) │ ... │ - │ │ └────────────────────────┴─────────────────────────┘ │ - ├──────────────────┼────────────────────────────────────────────────────────────────────────┤ - │ │ ┌───────────────────┬───────────────────┬────────────────────────┐ │ - │ #chunks │ │ c_i.mint │ c_i.maxt │ ref(c_i.data) │ ... │ - │ │ └───────────────────┴───────────────────┴────────────────────────┘ │ - ├──────────────────┴────────────────────────────────────────────────────────────────────────┤ - │ CRC32 <4 byte> │ - └───────────────────────────────────────────────────────────────────────────────────────────┘ + ┌─────────────────────────────────────────────────────────┐ + │ len │ + ├─────────────────────────────────────────────────────────┤ + │ ┌──────────────────┬──────────────────────────────────┐ │ + │ │ │ ┌──────────────────────────┐ │ │ + │ │ │ │ ref(l_i.name) │ │ │ + │ │ #labels │ ├──────────────────────────┤ ... │ │ + │ │ │ │ ref(l_i.value) │ │ │ + │ │ │ └──────────────────────────┘ │ │ + │ ├──────────────────┼──────────────────────────────────┤ │ + │ │ │ ┌──────────────────────────┐ │ │ + │ │ │ │ c_i.mint │ │ │ + │ │ │ ├──────────────────────────┤ │ │ + │ │ │ │ c_i.maxt │ │ │ + │ │ #chunks │ ├──────────────────────────┤ ... │ │ + │ │ │ │ ref(c_i.data) │ │ │ + │ │ │ ├──────────────────────────┤ │ │ + │ │ │ │ crc32(c_i.data) │ │ │ + │ │ │ └──────────────────────────┘ │ │ + │ └──────────────────┴──────────────────────────────────┘ │ + ├─────────────────────────────────────────────────────────┤ + │ CRC32 <4 byte> │ + └─────────────────────────────────────────────────────────┘ ``` The CRC checksum is calculated over the series contents of the index concatenated with the data of its chunks (with encoding byte, without length). diff --git a/chunks.go b/chunks.go index d828c1c13..78be77c22 100644 --- a/chunks.go +++ b/chunks.go @@ -17,6 +17,7 @@ import ( "bufio" "encoding/binary" "fmt" + "hash" "io" "os" @@ -41,6 +42,16 @@ type ChunkMeta struct { MinTime, MaxTime int64 // time range the data covers } +func (cm *ChunkMeta) hash(h hash.Hash) error { + if _, err := h.Write([]byte{byte(cm.Chunk.Encoding())}); err != nil { + return err + } + if _, err := h.Write(cm.Chunk.Bytes()); err != nil { + return err + } + return nil +} + // ChunkWriter serializes a time block of chunked series data. type ChunkWriter interface { // WriteChunks writes several chunks. The Chunk field of the ChunkMetas @@ -194,14 +205,14 @@ func (w *chunkWriter) WriteChunks(chks ...*ChunkMeta) error { for _, chk := range chks { chk.Ref = seq | uint64(w.n) - if err := w.write([]byte{byte(chk.Chunk.Encoding())}); err != nil { - return err - } n = binary.PutUvarint(b, uint64(len(chk.Chunk.Bytes()))) if err := w.write(b[:n]); err != nil { return err } + if err := w.write([]byte{byte(chk.Chunk.Encoding())}); err != nil { + return err + } if err := w.write(chk.Chunk.Bytes()); err != nil { return err } @@ -283,17 +294,16 @@ func (s *chunkReader) Chunk(ref uint64) (chunks.Chunk, error) { if int(off) >= len(b) { return nil, errors.Errorf("offset %d beyond data size %d", off, len(b)) } - - enc := chunks.Encoding(b[off]) - b = b[off+1:] + b = b[off:] l, n := binary.Uvarint(b) if n < 0 { return nil, fmt.Errorf("reading chunk length failed") } b = b[n:] + enc := chunks.Encoding(b[0]) - c, err := chunks.FromData(enc, b[0:l]) + c, err := chunks.FromData(enc, b[1:1+l]) if err != nil { return nil, err } diff --git a/index.go b/index.go index 9f3da5019..2f3ce00d4 100644 --- a/index.go +++ b/index.go @@ -35,6 +35,8 @@ const ( MagicIndex = 0xBAAAD700 indexFormatV1 = 1 + + indexSeriesFormatV1 = 1 ) const compactionPageBytes = minSectorSize * 64 @@ -71,8 +73,8 @@ type indexWriterSeries struct { // serialization format. type indexWriter struct { f *os.File - bufw *bufio.Writer - n int64 + fbuf *bufio.Writer + pos int started bool // Reusable memory. @@ -102,8 +104,8 @@ func newIndexWriter(dir string) (*indexWriter, error) { iw := &indexWriter{ f: f, - bufw: bufio.NewWriterSize(f, 1<<22), - n: 0, + fbuf: bufio.NewWriterSize(f, 1<<22), + pos: 0, // Reusable memory. b: make([]byte, 0, 1<<23), @@ -120,40 +122,19 @@ func newIndexWriter(dir string) (*indexWriter, error) { return iw, nil } -func (w *indexWriter) write(wr io.Writer, b []byte) error { - n, err := wr.Write(b) - w.n += int64(n) +func (w *indexWriter) write(b []byte) error { + n, err := w.fbuf.Write(b) + w.pos += n return err } -// section writes a CRC32 checksummed section of length l and guarded by flag. -func (w *indexWriter) section(l int, flag byte, f func(w io.Writer) error) error { - w.crc32.Reset() - wr := io.MultiWriter(w.crc32, w.bufw) - - b := [5]byte{flag, 0, 0, 0, 0} - binary.BigEndian.PutUint32(b[1:], uint32(l)) - - if err := w.write(wr, b[:]); err != nil { - return errors.Wrap(err, "writing header") - } - - if err := f(wr); err != nil { - return errors.Wrap(err, "write contents") - } - if err := w.write(w.bufw, w.crc32.Sum(nil)); err != nil { - return errors.Wrap(err, "writing checksum") - } - return nil -} - func (w *indexWriter) writeMeta() error { - b := [8]byte{} + b := [5]byte{} binary.BigEndian.PutUint32(b[:4], MagicIndex) b[4] = flagStd - return w.write(w.bufw, b[:]) + return w.write(b[:]) } func (w *indexWriter) AddSeries(ref uint32, lset labels.Labels, chunks ...*ChunkMeta) error { @@ -181,24 +162,36 @@ func (w *indexWriter) writeSymbols() error { } sort.Strings(symbols) - // The start of the section plus a 5 byte section header are our base. - // TODO(fabxc): switch to relative offsets and hold sections in a TOC. - base := uint32(w.n) + 5 + buf := make([]byte, 8) - buf := [binary.MaxVarintLen32]byte{} - w.b = append(w.b[:0], flagStd) + // 8 byte header of symbol count and serialization length. + binary.BigEndian.PutUint32(buf[:4], uint32(len(symbols))) + + w.b = w.b[:0] + w.b = append(w.b, buf...) for _, s := range symbols { - w.symbols[s] = base + uint32(len(w.b)) + w.symbols[s] = uint32(w.pos + len(w.b)) - n := binary.PutUvarint(buf[:], uint64(len(s))) + n := binary.PutUvarint(buf, uint64(len(s))) w.b = append(w.b, buf[:n]...) w.b = append(w.b, s...) } - return w.section(len(w.b), flagStd, func(wr io.Writer) error { - return w.write(wr, w.b) - }) + binary.BigEndian.PutUint32(buf[:4], uint32(len(w.b))-8) + copy(w.b[4:], buf[:4]) + + w.crc32.Reset() + // Write checksum over contents excluding the 8 byte header. + if _, err := w.crc32.Write(w.b[8:]); err != nil { + return errors.Wrap(err, "calculate symbols CRC32 checksum") + } + w.b = w.crc32.Sum(w.b) + + if err := w.write(w.b); err != nil { + return errors.Wrap(err, "write symbols") + } + return nil } type indexWriterSeriesSlice []*indexWriterSeries @@ -219,17 +212,16 @@ func (w *indexWriter) writeSeries() error { } sort.Sort(series) - // Current end of file plus 5 bytes for section header. - // TODO(fabxc): switch to relative offsets. - base := uint32(w.n) + 5 - - w.b = w.b[:0] buf := make([]byte, binary.MaxVarintLen64) + // Header holds number of series. + binary.BigEndian.PutUint32(buf, uint32(len(series))) + if err := w.write(buf[:4]); err != nil { + return errors.Wrap(err, "write series count") + } + for _, s := range series { - // Write label set symbol references. - start := len(w.b) - s.offset = base + uint32(start) + w.b = w.b[:0] n := binary.PutUvarint(buf, uint64(len(s.labels))) w.b = append(w.b, buf[:n]...) @@ -253,28 +245,33 @@ func (w *indexWriter) writeSeries() error { n = binary.PutUvarint(buf, uint64(c.Ref)) w.b = append(w.b, buf[:n]...) + + w.crc32.Reset() + if err := c.hash(w.crc32); err != nil { + return errors.Wrap(err, "calculate chunk CRC32") + } + w.b = w.crc32.Sum(w.b) + } + + s.offset = uint32(w.pos) + + n = binary.PutUvarint(buf, uint64(len(w.b))) + if err := w.write(buf[:n]); err != nil { + return errors.Wrap(err, "write series data size") } - // Write checksum over series index entry and all its chunk data. w.crc32.Reset() - w.crc32.Write(w.b[start:]) - - for _, c := range s.chunks { - fmt.Println(c) - if _, err := w.crc32.Write([]byte{byte(c.Chunk.Encoding())}); err != nil { - return err - } - if _, err := w.crc32.Write(c.Chunk.Bytes()); err != nil { - return err - } + if _, err := w.crc32.Write(w.b); err != nil { + return errors.Wrap(err, "calculate series CRC32") } + w.b = w.crc32.Sum(w.b) - w.b = append(w.b, w.crc32.Sum(nil)...) + if err := w.write(w.b); err != nil { + return errors.Wrap(err, "write series data") + } } - return w.section(len(w.b), flagStd, func(wr io.Writer) error { - return w.write(wr, w.b) - }) + return nil } func (w *indexWriter) init() error { @@ -304,7 +301,7 @@ func (w *indexWriter) WriteLabelIndex(names []string, values []string) error { w.labelIndexes = append(w.labelIndexes, hashEntry{ name: strings.Join(names, string(sep)), - offset: uint32(w.n), + offset: uint32(w.pos), }) buf := make([]byte, binary.MaxVarintLen32) @@ -312,21 +309,23 @@ func (w *indexWriter) WriteLabelIndex(names []string, values []string) error { l := n + len(values)*4 - return w.section(l, flagStd, func(wr io.Writer) error { - // First byte indicates tuple size for index. - if err := w.write(wr, buf[:n]); err != nil { - return err - } + w.b = append(w.b[:0], flagStd, 0, 0, 0, 0) + binary.BigEndian.PutUint32(w.b[1:], uint32(l)) - for _, v := range valt.s { - binary.BigEndian.PutUint32(buf, w.symbols[v]) + w.b = append(w.b, buf[:n]...) - if err := w.write(wr, buf[:4]); err != nil { - return err - } - } - return nil - }) + for _, v := range valt.s { + binary.BigEndian.PutUint32(buf, w.symbols[v]) + w.b = append(w.b, buf[:4]...) + } + + w.crc32.Reset() + if _, err := w.crc32.Write(w.b[5:]); err != nil { + return errors.Wrap(err, "calculate label index CRC32 checksum") + } + w.b = w.crc32.Sum(w.b) + + return w.write(w.b) } func (w *indexWriter) WritePostings(name, value string, it Postings) error { @@ -340,7 +339,7 @@ func (w *indexWriter) WritePostings(name, value string, it Postings) error { w.postings = append(w.postings, hashEntry{ name: key, - offset: uint32(w.n), + offset: uint32(w.pos), }) // Order of the references in the postings list does not imply order @@ -361,7 +360,7 @@ func (w *indexWriter) WritePostings(name, value string, it Postings) error { sort.Sort(uint32slice(refs)) - w.b = w.b[:0] + w.b = append(w.b[:0], flagStd, 0, 0, 0, 0) buf := make([]byte, 4) for _, r := range refs { @@ -371,9 +370,15 @@ func (w *indexWriter) WritePostings(name, value string, it Postings) error { w.uint32s = refs[:0] - return w.section(len(w.b), flagStd, func(wr io.Writer) error { - return w.write(wr, w.b) - }) + binary.BigEndian.PutUint32(w.b[1:], uint32(len(w.b)-5)) + + w.crc32.Reset() + if _, err := w.crc32.Write(w.b[5:]); err != nil { + return errors.Wrap(err, "calculate label index CRC32 checksum") + } + w.b = w.crc32.Sum(w.b) + + return w.write(w.b) } type uint32slice []uint32 @@ -388,7 +393,7 @@ type hashEntry struct { } func (w *indexWriter) writeHashmap(h []hashEntry) error { - w.b = w.b[:0] + w.b = append(w.b[:0], flagStd, 0, 0, 0, 0) buf := [binary.MaxVarintLen32]byte{} for _, e := range h { @@ -400,19 +405,25 @@ func (w *indexWriter) writeHashmap(h []hashEntry) error { w.b = append(w.b, buf[:n]...) } - return w.section(len(w.b), flagStd, func(wr io.Writer) error { - return w.write(wr, w.b) - }) + binary.BigEndian.PutUint32(w.b[1:], uint32(len(w.b)-5)) + + w.crc32.Reset() + if _, err := w.crc32.Write(w.b[5:]); err != nil { + return errors.Wrap(err, "calculate label index CRC32 checksum") + } + w.b = w.crc32.Sum(w.b) + + return w.write(w.b) } func (w *indexWriter) finalize() error { // Write out hash maps to jump to correct label index and postings sections. - lo := uint32(w.n) + lo := uint32(w.pos) if err := w.writeHashmap(w.labelIndexes); err != nil { return err } - po := uint32(w.n) + po := uint32(w.pos) if err := w.writeHashmap(w.postings); err != nil { return err } @@ -425,14 +436,14 @@ func (w *indexWriter) finalize() error { binary.BigEndian.PutUint32(b[:4], lo) binary.BigEndian.PutUint32(b[4:], po) - return w.write(w.bufw, b[:]) + return w.write(b[:]) } func (w *indexWriter) Close() error { if err := w.finalize(); err != nil { return err } - if err := w.bufw.Flush(); err != nil { + if err := w.fbuf.Flush(); err != nil { return err } if err := fileutil.Fsync(w.f); err != nil { @@ -588,7 +599,7 @@ func (r *indexReader) lookupSymbol(o uint32) (string, error) { end := int(o) + n + int(l) if end > len(r.b) { - return "", errors.New("invalid length") + return "", errors.Errorf("invalid length %d", l) } b := r.b[int(o)+n : end] @@ -640,12 +651,16 @@ func (r *indexReader) LabelIndices() ([][]string, error) { } func (r *indexReader) Series(ref uint32) (labels.Labels, []*ChunkMeta, error) { - k, n := binary.Uvarint(r.b[ref:]) + // Read away length of series data. + _, n := binary.Uvarint(r.b[ref:]) + b := r.b[int(ref)+n:] + + k, n := binary.Uvarint(b) if n < 1 { return nil, nil, errors.Wrap(errInvalidSize, "number of labels") } - b := r.b[int(ref)+n:] + b = b[n:] lbls := make(labels.Labels, 0, k) for i := 0; i < 2*int(k); i += 2 { @@ -703,6 +718,9 @@ func (r *indexReader) Series(ref uint32) (labels.Labels, []*ChunkMeta, error) { } b = b[n:] + // TODO(fabxc): read and potentially verify checksum. + b = b[4:] + chunks = append(chunks, &ChunkMeta{ Ref: o, MinTime: firstTime, @@ -710,6 +728,8 @@ func (r *indexReader) Series(ref uint32) (labels.Labels, []*ChunkMeta, error) { }) } + // TODO(fabxc): read and potentially verify checksum. + return lbls, chunks, nil } @@ -734,6 +754,7 @@ func (r *indexReader) Postings(name, value string) (Postings, error) { if len(b)%4 != 0 { return nil, errors.Wrap(errInvalidSize, "plain postings entry") } + return newBigEndianPostings(b), nil }