diff --git a/Documentation/format/chunks.md b/Documentation/format/chunks.md new file mode 100644 index 000000000..51ac1242c --- /dev/null +++ b/Documentation/format/chunks.md @@ -0,0 +1,15 @@ +# Chunks Disk Format + +The following describes the format of a single chunks file, which is created in the `chunks/` directory of a block. The maximum size per segment file is 512MiB. + +Chunks in the files are referenced from the index by the in-file offset in the 4 LSB and the segment sequence number in the bigher 4 MSBs. + +``` +┌────────────────────────────────────────┬──────────────────────┐ +│ magic(0x85BD40DD) <4 byte> │ version(1) <1 byte> │ +├────────────────────────────────────────┴──────────────────────┤ +│ ┌───────────────┬───────────────────┬──────┬────────────────┐ │ +│ │ len │ encoding <1 byte> │ data │ CRC32 <4 byte> │ │ +│ └───────────────┴───────────────────┴──────┴────────────────┘ │ +└───────────────────────────────────────────────────────────────┘ +``` diff --git a/Documentation/format/index.md b/Documentation/format/index.md new file mode 100644 index 000000000..0bb0b5538 --- /dev/null +++ b/Documentation/format/index.md @@ -0,0 +1,206 @@ +# Index Disk Format + +The following describes the format of the `index` file found in each block directory. +It is terminated by a table of contents which serves as an entry point into the index. + +``` +┌────────────────────────────┬─────────────────────┐ +│ magic(0xBAAAD700) <4b> │ version(1) <1 byte> │ +├────────────────────────────┴─────────────────────┤ +│ ┌──────────────────────────────────────────────┐ │ +│ │ Symbol Table │ │ +│ ├──────────────────────────────────────────────┤ │ +│ │ Series │ │ +│ ├──────────────────────────────────────────────┤ │ +│ │ Label Index 1 │ │ +│ ├──────────────────────────────────────────────┤ │ +│ │ ... │ │ +│ ├──────────────────────────────────────────────┤ │ +│ │ Label Index N │ │ +│ ├──────────────────────────────────────────────┤ │ +│ │ Label Index Table │ │ +│ ├──────────────────────────────────────────────┤ │ +│ │ Postings 1 │ │ +│ ├──────────────────────────────────────────────┤ │ +│ │ ... │ │ +│ ├──────────────────────────────────────────────┤ │ +│ │ Postings N │ │ +│ ├──────────────────────────────────────────────┤ │ +│ │ Postings Table │ │ +│ ├──────────────────────────────────────────────┤ │ +│ │ TOC │ │ +│ └──────────────────────────────────────────────┘ │ +└──────────────────────────────────────────────────┘ +``` + +When the index is written, an arbitrary number of padding bytes may be added between the lined out main sections above. When sequentially scanning through the file, any zero bytes after a section's specified length must be skipped. + +Most of the sections described below start with a `len` field. It always specifies the number of bytes just before the trailing CRC32 checksum. The checksum is always calculated over those `len` bytes. + + +### Symbol Table + +The symbol table holds a sorted list of deduplicated strings that occurred in label pairs of the stored series. They can be referenced from subsequent sections and significantly reduce the total index size. + +The section contains a sequence of the string entries, each prefixed with the string's length in raw bytes. All strings are utf-8 encoded. +Strings are referenced by pointing to the beginning of their length field. The strings are sorted in lexicographically ascending order. + +``` +┌────────────────────┬─────────────────────┐ +│ len <4b> │ #symbols <4b> │ +├────────────────────┴─────────────────────┤ +│ ┌──────────────────────┬───────────────┐ │ +│ │ len(str_1) │ str_1 │ │ +│ ├──────────────────────┴───────────────┤ │ +│ │ . . . │ │ +│ ├──────────────────────┬───────────────┤ │ +│ │ len(str_n) │ str_1 │ │ +│ └──────────────────────┴───────────────┘ │ +├──────────────────────────────────────────┤ +│ CRC32 <4b> │ +└──────────────────────────────────────────┘ +``` + + +### Series + +The section contains a sequence of series that hold the label set of the series as well as its chunks within the block. The series are sorted lexicographically by their label sets. +The file offset to the beginning of a series serves as the series' ID in all subsequent references. Thereby, a sorted list of series IDs implies a lexicographically sorted list of series label sets. + +``` +┌───────────────────────────────────────┐ +│ #series <4b> │ +├───────────────────────────────────────┤ +│ ┌───────────────────────────────────┐ │ +│ │ series_1 │ │ +│ ├───────────────────────────────────┤ │ +│ │ . . . │ │ +│ ├───────────────────────────────────┤ │ +│ │ series_n │ │ +│ └───────────────────────────────────┘ │ +└───────────────────────────────────────┘ +``` + +Every series entry first holds its number of labels, followed by tuples of symbol table references that contain the label name and value. The label pairs are lexicographically sorted. +After the labels, the number of indexed chunks is encoded, followed by a sequence of metadata entries containing the chunks minimum and maximum timestamp and a reference to its position in the chunk file. Holding the time range data in the index allows dropping chunks irrelevant to queried time ranges without accessing them directly. + +``` +┌─────────────────────────────────────────────────────────┐ +│ len │ +├─────────────────────────────────────────────────────────┤ +│ ┌──────────────────┬──────────────────────────────────┐ │ +│ │ │ ┌──────────────────────────┐ │ │ +│ │ │ │ ref(l_i.name) │ │ │ +│ │ #labels │ ├──────────────────────────┤ ... │ │ +│ │ │ │ ref(l_i.value) │ │ │ +│ │ │ └──────────────────────────┘ │ │ +│ ├──────────────────┼──────────────────────────────────┤ │ +│ │ │ ┌──────────────────────────┐ │ │ +│ │ │ │ c_i.mint │ │ │ +│ │ │ ├──────────────────────────┤ │ │ +│ │ #chunks │ │ c_i.maxt │ │ │ +│ │ │ ├──────────────────────────┤ ... │ │ +│ │ │ │ ref(c_i.data) │ │ │ +│ │ │ └──────────────────────────┘ │ │ +│ └──────────────────┴──────────────────────────────────┘ │ +├─────────────────────────────────────────────────────────┤ +│ CRC32 <4b> │ +└─────────────────────────────────────────────────────────┘ +``` + + + +### Label Index + +A label index section indexes the existing (combined) values for one or more label names. +The `#names` field determines the number indexed label names, followed by the total number of entries in the `#entries` field. The body holds `#entries` symbol table reference tuples of length of length `#names`. The value tuples are sorted in lexicographically increasing order. + +``` +┌───────────────┬────────────────┬────────────────┐ +│ len <4b> │ #names <4b> │ #entries <4b> │ +├───────────────┴────────────────┴────────────────┤ +│ ┌─────────────────────────────────────────────┐ │ +│ │ ref(value_0) <4b> │ │ +│ ├─────────────────────────────────────────────┤ │ +│ │ ... │ │ +│ ├─────────────────────────────────────────────┤ │ +│ │ ref(value_n) <4b> │ │ +│ └─────────────────────────────────────────────┘ │ +│ . . . │ +├─────────────────────────────────────────────────┤ +│ CRC32 <4b> │ +└─────────────────────────────────────────────────┘ +``` + +The sequence of label index sections is finalized by an offset table pointing to the beginning of each label index section for a given set of label names. + +### Postings + +Postings sections store monotinically increasing lists of series references that contain a given label pair associated with the list. + +``` +┌────────────────────┬────────────────────┐ +│ len <4b> │ #entries <4b> │ +├────────────────────┴────────────────────┤ +│ ┌─────────────────────────────────────┐ │ +│ │ ref(series_1) <4b> │ │ +│ ├─────────────────────────────────────┤ │ +│ │ ... │ │ +│ ├─────────────────────────────────────┤ │ +│ │ ref(series_n) <4b> │ │ +│ └─────────────────────────────────────┘ │ +├─────────────────────────────────────────┤ +│ CRC32 <4b> │ +└─────────────────────────────────────────┘ +``` + +The sequence of postings sections is finalized by an offset table pointing to the beginning of each postings section for a given set of label names. + +### Offset Table + +An offset table stores a sequence of entries that maps a list of strings to an offset. They are used to track label index and postings sections. They are read into memory when an index file is loaded. + +``` +┌─────────────────────┬────────────────────┐ +│ len <4b> │ #entries <4b> │ +├─────────────────────┴────────────────────┤ +│ ┌──────────────────────────────────────┐ │ +│ │ n = #strs │ │ +│ ├──────────────────────┬───────────────┤ │ +│ │ len(str_1) │ str_1 │ │ +│ ├──────────────────────┴───────────────┤ │ +│ │ ... │ │ +│ ├──────────────────────┬───────────────┤ │ +│ │ len(str_n) │ str_n │ │ +│ ├──────────────────────┴───────────────┤ │ +│ │ offset │ │ +│ └──────────────────────────────────────┘ │ +│ . . . │ +├──────────────────────────────────────────┤ +│ CRC32 <4b> │ +└──────────────────────────────────────────┘ +``` + + +### TOC + +The table of contents serves as an entry point to the entire index and points to various sections in the file. +If a reference is zero, it indicates the respective section does not exist and empty results should be returned upon lookup. + +``` +┌─────────────────────────────────────────┐ +│ ref(symbols) <8b> │ +├─────────────────────────────────────────┤ +│ ref(series) <8b> │ +├─────────────────────────────────────────┤ +│ ref(label indices start) <8b> │ +├─────────────────────────────────────────┤ +│ ref(label indices table) <8b> │ +├─────────────────────────────────────────┤ +│ ref(postings start) <8b> │ +├─────────────────────────────────────────┤ +│ ref(postings table) <8b> │ +├─────────────────────────────────────────┤ +│ CRC32 <4b> │ +└─────────────────────────────────────────┘ +``` \ No newline at end of file diff --git a/chunks.go b/chunks.go index aa44b259e..77663359c 100644 --- a/chunks.go +++ b/chunks.go @@ -43,6 +43,17 @@ type ChunkMeta struct { MinTime, MaxTime int64 // time range the data covers } +// writeHash writes the chunk encoding and raw data into the provided hash. +func (cm *ChunkMeta) writeHash(h hash.Hash) error { + if _, err := h.Write([]byte{byte(cm.Chunk.Encoding())}); err != nil { + return err + } + if _, err := h.Write(cm.Chunk.Bytes()); err != nil { + return err + } + return nil +} + // ChunkWriter serializes a time block of chunked series data. type ChunkWriter interface { // WriteChunks writes several chunks. The Chunk field of the ChunkMetas @@ -165,8 +176,8 @@ func (w *chunkWriter) cut() error { return nil } -func (w *chunkWriter) write(wr io.Writer, b []byte) error { - n, err := wr.Write(b) +func (w *chunkWriter) write(b []byte) error { + n, err := w.wbuf.Write(b) w.n += int64(n) return err } @@ -187,14 +198,10 @@ func (w *chunkWriter) WriteChunks(chks ...*ChunkMeta) error { } } - // Write chunks sequentially and set the reference field in the ChunkMeta. - w.crc32.Reset() - wr := io.MultiWriter(w.crc32, w.wbuf) - b := make([]byte, binary.MaxVarintLen32) n := binary.PutUvarint(b, uint64(len(chks))) - if err := w.write(wr, b[:n]); err != nil { + if err := w.write(b[:n]); err != nil { return err } seq := uint64(w.seq()) << 32 @@ -204,21 +211,25 @@ func (w *chunkWriter) WriteChunks(chks ...*ChunkMeta) error { n = binary.PutUvarint(b, uint64(len(chk.Chunk.Bytes()))) - if err := w.write(wr, b[:n]); err != nil { + if err := w.write(b[:n]); err != nil { return err } - if err := w.write(wr, []byte{byte(chk.Chunk.Encoding())}); err != nil { + if err := w.write([]byte{byte(chk.Chunk.Encoding())}); err != nil { return err } - if err := w.write(wr, chk.Chunk.Bytes()); err != nil { + if err := w.write(chk.Chunk.Bytes()); err != nil { return err } - chk.Chunk = nil - } - if err := w.write(w.wbuf, w.crc32.Sum(nil)); err != nil { - return err + w.crc32.Reset() + if err := chk.writeHash(w.crc32); err != nil { + return err + } + if err := w.write(w.crc32.Sum(nil)); err != nil { + return err + } } + return nil } diff --git a/db.go b/db.go index 66bd6679a..13ab5b7eb 100644 --- a/db.go +++ b/db.go @@ -95,8 +95,6 @@ type Appender interface { Rollback() error } -const sep = '\xff' - // DB handles reads and writes of time series falling into // a hashed partition of a seriedb. type DB struct { diff --git a/encoding_helpers.go b/encoding_helpers.go new file mode 100644 index 000000000..91f73a54c --- /dev/null +++ b/encoding_helpers.go @@ -0,0 +1,157 @@ +package tsdb + +import ( + "encoding/binary" + "hash" + "unsafe" +) + +// enbuf is a helper type to populate a byte slice with various types. +type encbuf struct { + b []byte + c [binary.MaxVarintLen64]byte +} + +func (e *encbuf) reset() { e.b = e.b[:0] } +func (e *encbuf) get() []byte { return e.b } +func (e *encbuf) len() int { return len(e.b) } + +func (e *encbuf) putString(s string) { e.b = append(e.b, s...) } +func (e *encbuf) putBytes(b []byte) { e.b = append(e.b, b...) } +func (e *encbuf) putByte(c byte) { e.b = append(e.b, c) } + +func (e *encbuf) putBE32int(x int) { e.putBE32(uint32(x)) } +func (e *encbuf) putBE64int(x int) { e.putBE64(uint64(x)) } +func (e *encbuf) putUvarint32(x uint32) { e.putUvarint64(uint64(x)) } +func (e *encbuf) putUvarint(x int) { e.putUvarint64(uint64(x)) } + +func (e *encbuf) putBE32(x uint32) { + binary.BigEndian.PutUint32(e.c[:], x) + e.b = append(e.b, e.c[:4]...) +} + +func (e *encbuf) putBE64(x uint64) { + binary.BigEndian.PutUint64(e.c[:], x) + e.b = append(e.b, e.c[:8]...) +} + +func (e *encbuf) putUvarint64(x uint64) { + n := binary.PutUvarint(e.c[:], x) + e.b = append(e.b, e.c[:n]...) +} + +func (e *encbuf) putVarint64(x int64) { + n := binary.PutVarint(e.c[:], x) + e.b = append(e.b, e.c[:n]...) +} + +// putVarintStr writes a string to the buffer prefixed by its varint length (in bytes!). +func (e *encbuf) putUvarintStr(s string) { + b := *(*[]byte)(unsafe.Pointer(&s)) + e.putUvarint(len(b)) + e.putString(s) +} + +// putHash appends a hash over the buffers current contents to the buffer. +func (e *encbuf) putHash(h hash.Hash) { + h.Reset() + _, err := h.Write(e.b) + if err != nil { + panic(err) // The CRC32 implementation does not error + } + e.b = h.Sum(e.b) +} + +// decbuf provides safe methods to extract data from a byte slice. It does all +// necessary bounds checking and advancing of the byte slice. +// Several datums can be extracted without checking for errors. However, before using +// any datum, the err() method must be checked. +type decbuf struct { + b []byte + e error +} + +func (d *decbuf) uvarint() int { return int(d.uvarint64()) } +func (d *decbuf) be32int() int { return int(d.be32()) } + +func (d *decbuf) uvarintStr() string { + l := d.uvarint64() + if d.e != nil { + return "" + } + if len(d.b) < int(l) { + d.e = errInvalidSize + return "" + } + s := yoloString(d.b[:l]) + d.b = d.b[l:] + return s +} + +func (d *decbuf) varint64() int64 { + if d.e != nil { + return 0 + } + x, n := binary.Varint(d.b) + if n < 1 { + d.e = errInvalidSize + return 0 + } + d.b = d.b[n:] + return x +} + +func (d *decbuf) uvarint64() uint64 { + if d.e != nil { + return 0 + } + x, n := binary.Uvarint(d.b) + if n < 1 { + d.e = errInvalidSize + return 0 + } + d.b = d.b[n:] + return x +} + +func (d *decbuf) be64() uint64 { + if d.e != nil { + return 0 + } + if len(d.b) < 4 { + d.e = errInvalidSize + return 0 + } + x := binary.BigEndian.Uint64(d.b) + d.b = d.b[8:] + return x +} + +func (d *decbuf) be32() uint32 { + if d.e != nil { + return 0 + } + if len(d.b) < 4 { + d.e = errInvalidSize + return 0 + } + x := binary.BigEndian.Uint32(d.b) + d.b = d.b[4:] + return x +} + +func (d *decbuf) decbuf(l int) decbuf { + if d.e != nil { + return decbuf{e: d.e} + } + if l > len(d.b) { + return decbuf{e: errInvalidSize} + } + r := decbuf{b: d.b[:l]} + d.b = d.b[l:] + return r +} + +func (d *decbuf) err() error { return d.e } +func (d *decbuf) len() int { return len(d.b) } +func (d *decbuf) get() []byte { return d.b } diff --git a/index.go b/index.go index 73b65da30..c0e96381f 100644 --- a/index.go +++ b/index.go @@ -25,6 +25,8 @@ import ( "sort" "strings" + "math" + "github.com/coreos/etcd/pkg/fileutil" "github.com/pkg/errors" "github.com/prometheus/tsdb/labels" @@ -39,6 +41,44 @@ const ( const compactionPageBytes = minSectorSize * 64 +type indexWriterSeries struct { + labels labels.Labels + chunks []*ChunkMeta // series file offset of chunks + offset uint32 // index file offset of series reference +} + +type indexWriterSeriesSlice []*indexWriterSeries + +func (s indexWriterSeriesSlice) Len() int { return len(s) } +func (s indexWriterSeriesSlice) Swap(i, j int) { s[i], s[j] = s[j], s[i] } + +func (s indexWriterSeriesSlice) Less(i, j int) bool { + return labels.Compare(s[i].labels, s[j].labels) < 0 +} + +type indexWriterStage uint8 + +const ( + idxStagePopulate indexWriterStage = iota + idxStageLabelIndex + idxStagePostings + idxStageDone +) + +func (s indexWriterStage) String() string { + switch s { + case idxStagePopulate: + return "populate" + case idxStageLabelIndex: + return "label index" + case idxStagePostings: + return "postings" + case idxStageDone: + return "done" + } + return "" +} + // IndexWriter serializes the index for a block of series data. // The methods must generally be called in the order they are specified in. type IndexWriter interface { @@ -61,22 +101,19 @@ type IndexWriter interface { Close() error } -type indexWriterSeries struct { - labels labels.Labels - chunks []*ChunkMeta // series file offset of chunks - offset uint32 // index file offset of series reference -} - // indexWriter implements the IndexWriter interface for the standard // serialization format. type indexWriter struct { - f *os.File - bufw *bufio.Writer - n int64 - started bool + f *os.File + fbuf *bufio.Writer + pos uint64 + + toc indexTOC + stage indexWriterStage // Reusable memory. - b []byte + buf1 encbuf + buf2 encbuf uint32s []uint32 series map[uint32]*indexWriterSeries @@ -87,6 +124,15 @@ type indexWriter struct { crc32 hash.Hash } +type indexTOC struct { + symbols uint64 + series uint64 + labelIndices uint64 + labelIndicesTable uint64 + postings uint64 + postingsTable uint64 +} + func newIndexWriter(dir string) (*indexWriter, error) { df, err := fileutil.OpenDir(dir) if err != nil { @@ -101,12 +147,14 @@ func newIndexWriter(dir string) (*indexWriter, error) { } iw := &indexWriter{ - f: f, - bufw: bufio.NewWriterSize(f, 1<<22), - n: 0, + f: f, + fbuf: bufio.NewWriterSize(f, 1<<22), + pos: 0, + stage: idxStagePopulate, // Reusable memory. - b: make([]byte, 0, 1<<23), + buf1: encbuf{b: make([]byte, 0, 1<<22)}, + buf2: encbuf{b: make([]byte, 0, 1<<22)}, uint32s: make([]uint32, 0, 1<<15), // Caches. @@ -120,40 +168,87 @@ func newIndexWriter(dir string) (*indexWriter, error) { return iw, nil } -func (w *indexWriter) write(wr io.Writer, b []byte) error { - n, err := wr.Write(b) - w.n += int64(n) - return err +func (w *indexWriter) write(bufs ...[]byte) error { + for _, b := range bufs { + n, err := w.fbuf.Write(b) + w.pos += uint64(n) + if err != nil { + return err + } + // For now the index file must not grow beyond 4GiB. Some of the fixed-sized + // offset references in v1 are only 4 bytes large. + // Once we move to compressed/varint representations in those areas, this limitation + // can be lifted. + if w.pos > math.MaxUint32 { + return errors.Errorf("exceeding max size of 4GiB") + } + } + return nil } -// section writes a CRC32 checksummed section of length l and guarded by flag. -func (w *indexWriter) section(l int, flag byte, f func(w io.Writer) error) error { - w.crc32.Reset() - wr := io.MultiWriter(w.crc32, w.bufw) - - b := [5]byte{flag, 0, 0, 0, 0} - binary.BigEndian.PutUint32(b[1:], uint32(l)) +// addPadding adds zero byte padding until the file size is a multiple of n. +func (w *indexWriter) addPadding(n int) error { + p := n - (int(w.pos) % n) + if p == 0 { + return nil + } + return errors.Wrap(w.write(make([]byte, p)), "add padding") +} - if err := w.write(wr, b[:]); err != nil { - return errors.Wrap(err, "writing header") +// ensureStage handles transitions between write stages and ensures that IndexWriter +// methods are called in an order valid for the implementation. +func (w *indexWriter) ensureStage(s indexWriterStage) error { + if w.stage == s { + return nil + } + if w.stage > s { + return errors.Errorf("invalid stage %q, currently at %q", s, w.stage) } - if err := f(wr); err != nil { - return errors.Wrap(err, "write contents") + // Complete population stage by writing symbols and series. + if w.stage == idxStagePopulate { + w.toc.symbols = w.pos + if err := w.writeSymbols(); err != nil { + return err + } + w.toc.series = w.pos + if err := w.writeSeries(); err != nil { + return err + } } - if err := w.write(w.bufw, w.crc32.Sum(nil)); err != nil { - return errors.Wrap(err, "writing checksum") + + // Mark start of sections in table of contents. + switch s { + case idxStageLabelIndex: + w.toc.labelIndices = w.pos + + case idxStagePostings: + w.toc.labelIndicesTable = w.pos + if err := w.writeOffsetTable(w.labelIndexes); err != nil { + return err + } + w.toc.postings = w.pos + + case idxStageDone: + w.toc.postingsTable = w.pos + if err := w.writeOffsetTable(w.postings); err != nil { + return err + } + if err := w.writeTOC(); err != nil { + return err + } } + + w.stage = s return nil } func (w *indexWriter) writeMeta() error { - b := [8]byte{} - - binary.BigEndian.PutUint32(b[:4], MagicIndex) - b[4] = flagStd + w.buf1.reset() + w.buf1.putBE32(MagicIndex) + w.buf1.putByte(indexFormatV1) - return w.write(w.bufw, b[:]) + return w.write(w.buf1.get()) } func (w *indexWriter) AddSeries(ref uint32, lset labels.Labels, chunks ...*ChunkMeta) error { @@ -181,33 +276,27 @@ func (w *indexWriter) writeSymbols() error { } sort.Strings(symbols) - // The start of the section plus a 5 byte section header are our base. - // TODO(fabxc): switch to relative offsets and hold sections in a TOC. - base := uint32(w.n) + 5 + const headerSize = 4 - buf := [binary.MaxVarintLen32]byte{} - w.b = append(w.b[:0], flagStd) + w.buf1.reset() + w.buf2.reset() + + w.buf2.putBE32int(len(symbols)) for _, s := range symbols { - w.symbols[s] = base + uint32(len(w.b)) + w.symbols[s] = uint32(w.pos) + headerSize + uint32(w.buf2.len()) - n := binary.PutUvarint(buf[:], uint64(len(s))) - w.b = append(w.b, buf[:n]...) - w.b = append(w.b, s...) + // NOTE: len(s) gives the number of runes, not the number of bytes. + // Therefore the read-back length for strings with unicode characters will + // be off when not using putCstr. + w.buf2.putUvarintStr(s) } - return w.section(len(w.b), flagStd, func(wr io.Writer) error { - return w.write(wr, w.b) - }) -} - -type indexWriterSeriesSlice []*indexWriterSeries - -func (s indexWriterSeriesSlice) Len() int { return len(s) } -func (s indexWriterSeriesSlice) Swap(i, j int) { s[i], s[j] = s[j], s[i] } + w.buf1.putBE32int(w.buf2.len()) + w.buf2.putHash(w.crc32) -func (s indexWriterSeriesSlice) Less(i, j int) bool { - return labels.Compare(s[i].labels, s[j].labels) < 0 + err := w.write(w.buf1.get(), w.buf2.get()) + return errors.Wrap(err, "write symbols") } func (w *indexWriter) writeSeries() error { @@ -219,64 +308,52 @@ func (w *indexWriter) writeSeries() error { } sort.Sort(series) - // Current end of file plus 5 bytes for section header. - // TODO(fabxc): switch to relative offsets. - base := uint32(w.n) + 5 + // Header holds number of series. + w.buf1.reset() + w.buf1.putBE32int(len(series)) - w.b = w.b[:0] - buf := make([]byte, binary.MaxVarintLen64) + if err := w.write(w.buf1.get()); err != nil { + return errors.Wrap(err, "write series count") + } for _, s := range series { - // Write label set symbol references. - s.offset = base + uint32(len(w.b)) + s.offset = uint32(w.pos) - n := binary.PutUvarint(buf, uint64(len(s.labels))) - w.b = append(w.b, buf[:n]...) + w.buf2.reset() + w.buf2.putUvarint(len(s.labels)) for _, l := range s.labels { - n = binary.PutUvarint(buf, uint64(w.symbols[l.Name])) - w.b = append(w.b, buf[:n]...) - n = binary.PutUvarint(buf, uint64(w.symbols[l.Value])) - w.b = append(w.b, buf[:n]...) + w.buf2.putUvarint32(w.symbols[l.Name]) + w.buf2.putUvarint32(w.symbols[l.Value]) } - // Write chunks meta data including reference into chunk file. - n = binary.PutUvarint(buf, uint64(len(s.chunks))) - w.b = append(w.b, buf[:n]...) + w.buf2.putUvarint(len(s.chunks)) for _, c := range s.chunks { - n = binary.PutVarint(buf, c.MinTime) - w.b = append(w.b, buf[:n]...) - n = binary.PutVarint(buf, c.MaxTime) - w.b = append(w.b, buf[:n]...) - - n = binary.PutUvarint(buf, uint64(c.Ref)) - w.b = append(w.b, buf[:n]...) + w.buf2.putVarint64(c.MinTime) + w.buf2.putVarint64(c.MaxTime) + w.buf2.putUvarint64(c.Ref) } - } - return w.section(len(w.b), flagStd, func(wr io.Writer) error { - return w.write(wr, w.b) - }) -} + w.buf1.reset() + w.buf1.putUvarint(w.buf2.len()) -func (w *indexWriter) init() error { - if err := w.writeSymbols(); err != nil { - return err - } - if err := w.writeSeries(); err != nil { - return err + w.buf2.putHash(w.crc32) + + if err := w.write(w.buf1.get(), w.buf2.get()); err != nil { + return errors.Wrap(err, "write series data") + } } - w.started = true return nil } func (w *indexWriter) WriteLabelIndex(names []string, values []string) error { - if !w.started { - if err := w.init(); err != nil { - return err - } + if len(values)%len(names) != 0 { + return errors.Errorf("invalid value list length %d for %d names", len(values), len(names)) + } + if err := w.ensureStage(idxStageLabelIndex); err != nil { + return errors.Wrap(err, "ensure stage") } valt, err := newStringTuples(values, len(names)) @@ -285,45 +362,84 @@ func (w *indexWriter) WriteLabelIndex(names []string, values []string) error { } sort.Sort(valt) + // Align beginning to 4 bytes for more efficient index list scans. + if err := w.addPadding(4); err != nil { + return err + } + w.labelIndexes = append(w.labelIndexes, hashEntry{ - name: strings.Join(names, string(sep)), - offset: uint32(w.n), + keys: names, + offset: w.pos, }) - buf := make([]byte, binary.MaxVarintLen32) - n := binary.PutUvarint(buf, uint64(len(names))) + w.buf2.reset() + w.buf2.putBE32int(len(names)) + w.buf2.putBE32int(valt.Len()) - l := n + len(values)*4 + for _, v := range valt.s { + w.buf2.putBE32(w.symbols[v]) + } - return w.section(l, flagStd, func(wr io.Writer) error { - // First byte indicates tuple size for index. - if err := w.write(wr, buf[:n]); err != nil { - return err - } + w.buf1.reset() + w.buf1.putBE32int(w.buf2.len()) + + w.buf2.putHash(w.crc32) + + err = w.write(w.buf1.get(), w.buf2.get()) + return errors.Wrap(err, "write label index") +} - for _, v := range valt.s { - binary.BigEndian.PutUint32(buf, w.symbols[v]) +// writeOffsetTable writes a sequence of readable hash entries. +func (w *indexWriter) writeOffsetTable(entries []hashEntry) error { + w.buf1.reset() + w.buf1.putBE32int(len(entries)) - if err := w.write(wr, buf[:4]); err != nil { - return err - } + w.buf2.reset() + + for _, e := range entries { + w.buf2.putUvarint(len(e.keys)) + for _, k := range e.keys { + w.buf2.putUvarintStr(k) } - return nil - }) + w.buf2.putUvarint64(e.offset) + } + + w.buf1.putBE32int(w.buf2.len()) + w.buf2.putHash(w.crc32) + + return w.write(w.buf1.get(), w.buf2.get()) +} + +const indexTOCLen = 6*8 + 4 + +func (w *indexWriter) writeTOC() error { + w.buf1.reset() + + w.buf1.putBE64(w.toc.symbols) + w.buf1.putBE64(w.toc.series) + w.buf1.putBE64(w.toc.labelIndices) + w.buf1.putBE64(w.toc.labelIndicesTable) + w.buf1.putBE64(w.toc.postings) + w.buf1.putBE64(w.toc.postingsTable) + + w.buf1.putHash(w.crc32) + + return w.write(w.buf1.get()) } func (w *indexWriter) WritePostings(name, value string, it Postings) error { - if !w.started { - if err := w.init(); err != nil { - return err - } + if err := w.ensureStage(idxStagePostings); err != nil { + return errors.Wrap(err, "ensure stage") } - key := name + string(sep) + value + // Align beginning to 4 bytes for more efficient postings list scans. + if err := w.addPadding(4); err != nil { + return err + } w.postings = append(w.postings, hashEntry{ - name: key, - offset: uint32(w.n), + keys: []string{name, value}, + offset: w.pos, }) // Order of the references in the postings list does not imply order @@ -341,22 +457,22 @@ func (w *indexWriter) WritePostings(name, value string, it Postings) error { if err := it.Err(); err != nil { return err } - sort.Sort(uint32slice(refs)) - w.b = w.b[:0] - buf := make([]byte, 4) + w.buf2.reset() + w.buf2.putBE32int(len(refs)) for _, r := range refs { - binary.BigEndian.PutUint32(buf, r) - w.b = append(w.b, buf...) + w.buf2.putBE32(r) } - w.uint32s = refs[:0] + w.buf1.reset() + w.buf1.putBE32int(w.buf2.len()) - return w.section(len(w.b), flagStd, func(wr io.Writer) error { - return w.write(wr, w.b) - }) + w.buf2.putHash(w.crc32) + + err := w.write(w.buf1.get(), w.buf2.get()) + return errors.Wrap(err, "write postings") } type uint32slice []uint32 @@ -366,56 +482,15 @@ func (s uint32slice) Swap(i, j int) { s[i], s[j] = s[j], s[i] } func (s uint32slice) Less(i, j int) bool { return s[i] < s[j] } type hashEntry struct { - name string - offset uint32 -} - -func (w *indexWriter) writeHashmap(h []hashEntry) error { - w.b = w.b[:0] - buf := [binary.MaxVarintLen32]byte{} - - for _, e := range h { - n := binary.PutUvarint(buf[:], uint64(len(e.name))) - w.b = append(w.b, buf[:n]...) - w.b = append(w.b, e.name...) - - n = binary.PutUvarint(buf[:], uint64(e.offset)) - w.b = append(w.b, buf[:n]...) - } - - return w.section(len(w.b), flagStd, func(wr io.Writer) error { - return w.write(wr, w.b) - }) -} - -func (w *indexWriter) finalize() error { - // Write out hash maps to jump to correct label index and postings sections. - lo := uint32(w.n) - if err := w.writeHashmap(w.labelIndexes); err != nil { - return err - } - - po := uint32(w.n) - if err := w.writeHashmap(w.postings); err != nil { - return err - } - - // Terminate index file with offsets to hashmaps. This is the entry Pointer - // for any index query. - // TODO(fabxc): also store offset to series section to allow plain - // iteration over all existing series? - b := [8]byte{} - binary.BigEndian.PutUint32(b[:4], lo) - binary.BigEndian.PutUint32(b[4:], po) - - return w.write(w.bufw, b[:]) + keys []string + offset uint64 } func (w *indexWriter) Close() error { - if err := w.finalize(); err != nil { + if err := w.ensureStage(idxStageDone); err != nil { return err } - if err := w.bufw.Flush(); err != nil { + if err := w.fbuf.Flush(); err != nil { return err } if err := fileutil.Fsync(w.f); err != nil { @@ -453,7 +528,8 @@ type StringTuples interface { type indexReader struct { // The underlying byte slice holding the encoded series data. - b []byte + b []byte + toc indexTOC // Close that releases the underlying resources of the byte slice. c io.Closer @@ -484,57 +560,77 @@ func newIndexReader(dir string) (*indexReader, error) { return nil, errors.Errorf("invalid magic number %x", m) } - // The last two 4 bytes hold the pointers to the hashmaps. - loff := binary.BigEndian.Uint32(r.b[len(r.b)-8 : len(r.b)-4]) - poff := binary.BigEndian.Uint32(r.b[len(r.b)-4:]) + if err := r.readTOC(); err != nil { + return nil, errors.Wrap(err, "read TOC") + } - flag, b, err := r.section(loff) + r.labels, err = r.readOffsetTable(r.toc.labelIndicesTable) if err != nil { - return nil, errors.Wrapf(err, "label index hashmap section at %d", loff) - } - if r.labels, err = readHashmap(flag, b); err != nil { - return nil, errors.Wrap(err, "read label index hashmap") + return nil, errors.Wrap(err, "read label index table") } - flag, b, err = r.section(poff) + r.postings, err = r.readOffsetTable(r.toc.postingsTable) if err != nil { - return nil, errors.Wrapf(err, "postings hashmap section at %d", loff) - } - if r.postings, err = readHashmap(flag, b); err != nil { - return nil, errors.Wrap(err, "read postings hashmap") + return nil, errors.Wrap(err, "read postings table") } return r, nil } -func readHashmap(flag byte, b []byte) (map[string]uint32, error) { - if flag != flagStd { - return nil, errInvalidFlag +func (r *indexReader) readTOC() error { + d := r.decbufAt(len(r.b) - indexTOCLen) + + r.toc.symbols = d.be64() + r.toc.series = d.be64() + r.toc.labelIndices = d.be64() + r.toc.labelIndicesTable = d.be64() + r.toc.postings = d.be64() + r.toc.postingsTable = d.be64() + + // TODO(fabxc): validate checksum. + + return nil +} + +func (r *indexReader) decbufAt(off int) decbuf { + if len(r.b) < off { + return decbuf{e: errInvalidSize} } - h := make(map[string]uint32, 512) + return decbuf{b: r.b[off:]} +} - for len(b) > 0 { - l, n := binary.Uvarint(b) - if n < 1 { - return nil, errors.Wrap(errInvalidSize, "read key length") - } - b = b[n:] +// readOffsetTable reads an offset table at the given position and returns a map +// with the key strings concatenated by the 0xff unicode non-character. +func (r *indexReader) readOffsetTable(off uint64) (map[string]uint32, error) { + // A table might not have been written at all, in which case the position + // is zeroed out. + if off == 0 { + return nil, nil + } - if len(b) < int(l) { - return nil, errors.Wrap(errInvalidSize, "read key") - } - s := string(b[:l]) - b = b[l:] + const sep = "\xff" - o, n := binary.Uvarint(b) - if n < 1 { - return nil, errors.Wrap(errInvalidSize, "read offset value") + var ( + d1 = r.decbufAt(int(off)) + cnt = d1.be32() + d2 = d1.decbuf(d1.be32int()) + ) + + res := make(map[string]uint32, 512) + + for d2.err() == nil && d2.len() > 0 && cnt > 0 { + keyCount := int(d2.uvarint()) + keys := make([]string, 0, keyCount) + + for i := 0; i < keyCount; i++ { + keys = append(keys, d2.uvarintStr()) } - b = b[n:] + res[strings.Join(keys, sep)] = uint32(d2.uvarint()) - h[s] = uint32(o) + cnt-- } - return h, nil + // TODO(fabxc): verify checksum from remainer of d1. + return res, d2.err() } func (r *indexReader) Close() error { @@ -561,25 +657,19 @@ func (r *indexReader) section(o uint32) (byte, []byte, error) { } func (r *indexReader) lookupSymbol(o uint32) (string, error) { - if int(o) > len(r.b) { - return "", errors.Errorf("invalid symbol offset %d", o) - } - l, n := binary.Uvarint(r.b[o:]) - if n < 0 { - return "", errors.New("reading symbol length failed") - } + d := r.decbufAt(int(o)) - end := int(o) + n + int(l) - if end > len(r.b) { - return "", errors.New("invalid length") + s := d.uvarintStr() + if d.err() != nil { + return "", errors.Wrapf(d.err(), "read symbol at %d", o) } - b := r.b[int(o)+n : end] - - return yoloString(b), nil + return s, nil } func (r *indexReader) LabelValues(names ...string) (StringTuples, error) { - key := strings.Join(names, string(sep)) + const sep = "\xff" + + key := strings.Join(names, sep) off, ok := r.labels[key] if !ok { // XXX(fabxc): hot fix. Should return a partial data error and handle cases @@ -588,21 +678,21 @@ func (r *indexReader) LabelValues(names ...string) (StringTuples, error) { //return nil, fmt.Errorf("label index doesn't exist") } - flag, b, err := r.section(off) - if err != nil { - return nil, errors.Wrapf(err, "section at %d", off) - } - if flag != flagStd { - return nil, errInvalidFlag - } - l, n := binary.Uvarint(b) - if n < 1 { - return nil, errors.Wrap(errInvalidSize, "read label index size") + d1 := r.decbufAt(int(off)) + d2 := d1.decbuf(d1.be32int()) + + nc := d2.be32int() + d2.be32() // consume unused value entry count. + + if d2.err() != nil { + return nil, errors.Wrap(d2.err(), "read label value index") } + // TODO(fabxc): verify checksum in 4 remaining bytes of d1. + st := &serializedStringTuples{ - l: int(l), - b: b[n:], + l: nc, + b: d2.get(), lookup: r.lookupSymbol, } return st, nil @@ -614,110 +704,89 @@ func (emptyStringTuples) At(i int) ([]string, error) { return nil, nil } func (emptyStringTuples) Len() int { return 0 } func (r *indexReader) LabelIndices() ([][]string, error) { + const sep = "\xff" + res := [][]string{} for s := range r.labels { - res = append(res, strings.Split(s, string(sep))) + res = append(res, strings.Split(s, sep)) } return res, nil } func (r *indexReader) Series(ref uint32) (labels.Labels, []*ChunkMeta, error) { - k, n := binary.Uvarint(r.b[ref:]) - if n < 1 { - return nil, nil, errors.Wrap(errInvalidSize, "number of labels") - } + d1 := r.decbufAt(int(ref)) + d2 := d1.decbuf(int(d1.uvarint())) - b := r.b[int(ref)+n:] + k := int(d2.uvarint()) lbls := make(labels.Labels, 0, k) - for i := 0; i < 2*int(k); i += 2 { - o, m := binary.Uvarint(b) - if m < 1 { - return nil, nil, errors.Wrap(errInvalidSize, "symbol offset") - } - n, err := r.lookupSymbol(uint32(o)) - if err != nil { - return nil, nil, errors.Wrap(err, "symbol lookup") + for i := 0; i < k; i++ { + lno := uint32(d2.uvarint()) + lvo := uint32(d2.uvarint()) + + if d2.err() != nil { + return nil, nil, errors.Wrap(d2.err(), "read series label offsets") } - b = b[m:] - o, m = binary.Uvarint(b) - if m < 1 { - return nil, nil, errors.Wrap(errInvalidSize, "symbol offset") + ln, err := r.lookupSymbol(lno) + if err != nil { + return nil, nil, errors.Wrap(err, "lookup label name") } - v, err := r.lookupSymbol(uint32(o)) + lv, err := r.lookupSymbol(lvo) if err != nil { - return nil, nil, errors.Wrap(err, "symbol lookup") + return nil, nil, errors.Wrap(err, "lookup label value") } - b = b[m:] - lbls = append(lbls, labels.Label{ - Name: n, - Value: v, - }) + lbls = append(lbls, labels.Label{Name: ln, Value: lv}) } // Read the chunks meta data. - k, n = binary.Uvarint(b) - if n < 1 { - return nil, nil, errors.Wrap(errInvalidSize, "number of chunks") - } - - b = b[n:] + k = int(d2.uvarint()) chunks := make([]*ChunkMeta, 0, k) - for i := 0; i < int(k); i++ { - firstTime, n := binary.Varint(b) - if n < 1 { - return nil, nil, errors.Wrap(errInvalidSize, "first time") - } - b = b[n:] + for i := 0; i < k; i++ { + mint := d2.varint64() + maxt := d2.varint64() + off := d2.uvarint64() - lastTime, n := binary.Varint(b) - if n < 1 { - return nil, nil, errors.Wrap(errInvalidSize, "last time") + if d2.err() != nil { + return nil, nil, errors.Wrapf(d2.err(), "read meta for chunk %d", i) } - b = b[n:] - - o, n := binary.Uvarint(b) - if n < 1 { - return nil, nil, errors.Wrap(errInvalidSize, "chunk offset") - } - b = b[n:] chunks = append(chunks, &ChunkMeta{ - Ref: o, - MinTime: firstTime, - MaxTime: lastTime, + Ref: off, + MinTime: mint, + MaxTime: maxt, }) } + // TODO(fabxc): verify CRC32. + return lbls, chunks, nil } func (r *indexReader) Postings(name, value string) (Postings, error) { - key := name + string(sep) + value + const sep = "\xff" + key := strings.Join([]string{name, value}, sep) off, ok := r.postings[key] if !ok { return emptyPostings, nil } - flag, b, err := r.section(off) - if err != nil { - return nil, errors.Wrapf(err, "section at %d", off) - } + d1 := r.decbufAt(int(off)) + d2 := d1.decbuf(d1.be32int()) - if flag != flagStd { - return nil, errors.Wrapf(errInvalidFlag, "section at %d", off) - } + d2.be32() // consume unused postings list length. - // Add iterator over the bytes. - if len(b)%4 != 0 { - return nil, errors.Wrap(errInvalidSize, "plain postings entry") + if d2.err() != nil { + return nil, errors.Wrap(d2.err(), "get postings bytes") } - return newBigEndianPostings(b), nil + + // TODO(fabxc): read checksum from 4 remainer bytes of d1 and verify. + + return newBigEndianPostings(d2.get()), nil } type stringTuples struct { @@ -766,7 +835,6 @@ type serializedStringTuples struct { } func (t *serializedStringTuples) Len() int { - // TODO(fabxc): Cache this? return len(t.b) / (4 * t.l) } diff --git a/index_test.go b/index_test.go index 3522ca68e..b38350dcf 100644 --- a/index_test.go +++ b/index_test.go @@ -22,6 +22,7 @@ import ( "testing" "github.com/pkg/errors" + "github.com/prometheus/tsdb/chunks" "github.com/prometheus/tsdb/labels" "github.com/stretchr/testify/require" ) @@ -50,10 +51,14 @@ func (m mockIndex) AddSeries(ref uint32, l labels.Labels, chunks ...*ChunkMeta) return errors.Errorf("series with reference %d already added", ref) } - m.series[ref] = series{ - l: l, - chunks: chunks, + s := series{l: l} + // Actual chunk data is not stored in the index. + for _, c := range chunks { + cc := *c + cc.Chunk = nil + s.chunks = append(s.chunks, &cc) } + m.series[ref] = s return nil } @@ -241,6 +246,7 @@ func TestPersistence_index_e2e(t *testing.T) { MinTime: int64(j * 10000), MaxTime: int64((j + 1) * 10000), Ref: rand.Uint64(), + Chunk: chunks.NewXORChunk(), }) } input = append(input, &indexWriterSeries{