diff --git a/chunks.go b/chunks.go new file mode 100644 index 000000000..c0c7954a6 --- /dev/null +++ b/chunks.go @@ -0,0 +1,299 @@ +package tsdb + +import ( + "bufio" + "encoding/binary" + "fmt" + "hash" + "hash/crc32" + "io" + "os" + + "github.com/coreos/etcd/pkg/fileutil" + "github.com/fabxc/tsdb/chunks" + "github.com/pkg/errors" +) + +const ( + // MagicSeries 4 bytes at the head of series file. + MagicChunks = 0x85BD40DD +) + +// ChunkMeta holds information about a chunk of data. +type ChunkMeta struct { + // Ref and Chunk hold either a reference that can be used to retrieve + // chunk data or the data itself. + // Generally, only one of them is set. + Ref uint64 + Chunk chunks.Chunk + + MinTime, MaxTime int64 // time range the data covers +} + +// ChunkWriter serializes a time block of chunked series data. +type ChunkWriter interface { + // WriteChunks writes several chunks. The data field of the ChunkMetas + // must be populated. + // After returning successfully, the Ref fields in the ChunkMetas + // is set and can be used to retrieve the chunks from the written data. + WriteChunks(chunks ...ChunkMeta) error + + // Close writes any required finalization and closes the resources + // associated with the underlying writer. + Close() error +} + +// chunkWriter implements the ChunkWriter interface for the standard +// serialization format. +type chunkWriter struct { + dirFile *os.File + files []*os.File + wbuf *bufio.Writer + n int64 + crc32 hash.Hash + + segmentSize int64 +} + +const ( + defaultChunkSegmentSize = 512 * 1024 * 1024 + + chunksFormatV1 = 1 +) + +func newChunkWriter(dir string) (*chunkWriter, error) { + if err := os.MkdirAll(dir, 0777); err != nil { + return nil, err + } + dirFile, err := fileutil.OpenDir(dir) + if err != nil { + return nil, err + } + cw := &chunkWriter{ + dirFile: dirFile, + n: 0, + crc32: crc32.New(crc32.MakeTable(crc32.Castagnoli)), + segmentSize: defaultChunkSegmentSize, + } + return cw, nil +} + +func (w *chunkWriter) tail() *os.File { + if len(w.files) == 0 { + return nil + } + return w.files[len(w.files)-1] +} + +// finalizeTail writes all pending data to the current tail file, +// truncates its size, and closes it. +func (w *chunkWriter) finalizeTail() error { + tf := w.tail() + if tf == nil { + return nil + } + + if err := w.wbuf.Flush(); err != nil { + return err + } + if err := fileutil.Fsync(tf); err != nil { + return err + } + // As the file was pre-allocated, we truncate any superfluous zero bytes. + off, err := tf.Seek(0, os.SEEK_CUR) + if err != nil { + return err + } + if err := tf.Truncate(off); err != nil { + return err + } + return tf.Close() +} + +func (w *chunkWriter) cut() error { + // Sync current tail to disk and close. + w.finalizeTail() + + p, _, err := nextSequenceFile(w.dirFile.Name(), "") + if err != nil { + return err + } + f, err := os.OpenFile(p, os.O_WRONLY|os.O_CREATE, 0666) + if err != nil { + return err + } + if err = fileutil.Preallocate(f, w.segmentSize, true); err != nil { + return err + } + if err = w.dirFile.Sync(); err != nil { + return err + } + + // Write header metadata for new file. + + metab := make([]byte, 8) + binary.BigEndian.PutUint32(metab[:4], MagicChunks) + metab[4] = chunksFormatV1 + + if _, err := f.Write(metab); err != nil { + return err + } + + w.files = append(w.files, f) + if w.wbuf != nil { + w.wbuf.Reset(f) + } else { + w.wbuf = bufio.NewWriterSize(f, 8*1024*1024) + } + w.n = 8 + + return nil +} + +func (w *chunkWriter) write(wr io.Writer, b []byte) error { + n, err := wr.Write(b) + w.n += int64(n) + return err +} + +func (w *chunkWriter) WriteChunks(chks ...ChunkMeta) error { + // Calculate maximum space we need and cut a new segment in case + // we don't fit into the current one. + maxLen := int64(binary.MaxVarintLen32) + for _, c := range chks { + maxLen += binary.MaxVarintLen32 + 1 + maxLen += int64(len(c.Chunk.Bytes())) + } + newsz := w.n + maxLen + + if w.wbuf == nil || w.n > w.segmentSize || newsz > w.segmentSize && maxLen <= w.segmentSize { + if err := w.cut(); err != nil { + return err + } + } + + // Write chunks sequentially and set the reference field in the ChunkMeta. + w.crc32.Reset() + wr := io.MultiWriter(w.crc32, w.wbuf) + + b := make([]byte, binary.MaxVarintLen32) + n := binary.PutUvarint(b, uint64(len(chks))) + + if err := w.write(wr, b[:n]); err != nil { + return err + } + seq := uint64(w.seq()) << 32 + + for i := range chks { + chk := &chks[i] + + chk.Ref = seq | uint64(w.n) + + n = binary.PutUvarint(b, uint64(len(chk.Chunk.Bytes()))) + + if err := w.write(wr, b[:n]); err != nil { + return err + } + if err := w.write(wr, []byte{byte(chk.Chunk.Encoding())}); err != nil { + return err + } + if err := w.write(wr, chk.Chunk.Bytes()); err != nil { + return err + } + chk.Chunk = nil + } + + if err := w.write(w.wbuf, w.crc32.Sum(nil)); err != nil { + return err + } + return nil +} + +func (w *chunkWriter) seq() int { + return len(w.files) - 1 +} + +func (w *chunkWriter) Close() error { + return w.finalizeTail() +} + +// ChunkReader provides reading access of serialized time series data. +type ChunkReader interface { + // Chunk returns the series data chunk with the given reference. + Chunk(ref uint64) (chunks.Chunk, error) + + // Close releases all underlying resources of the reader. + Close() error +} + +// chunkReader implements a SeriesReader for a serialized byte stream +// of series data. +type chunkReader struct { + // The underlying bytes holding the encoded series data. + bs [][]byte + + // Closers for resources behind the byte slices. + cs []io.Closer +} + +// newChunkReader returns a new chunkReader based on mmaped files found in dir. +func newChunkReader(dir string) (*chunkReader, error) { + files, err := sequenceFiles(dir, "") + if err != nil { + return nil, err + } + var cr chunkReader + + for _, fn := range files { + f, err := openMmapFile(fn) + if err != nil { + return nil, errors.Wrapf(err, "mmap files") + } + cr.cs = append(cr.cs, f) + cr.bs = append(cr.bs, f.b) + } + + for i, b := range cr.bs { + if len(b) < 4 { + return nil, errors.Wrapf(errInvalidSize, "validate magic in segment %d", i) + } + // Verify magic number. + if m := binary.BigEndian.Uint32(b[:4]); m != MagicChunks { + return nil, fmt.Errorf("invalid magic number %x", m) + } + } + return &cr, nil +} + +func (s *chunkReader) Close() error { + return closeAll(s.cs...) +} + +func (s *chunkReader) Chunk(ref uint64) (chunks.Chunk, error) { + var ( + seq = int(ref >> 32) + off = int((ref << 32) >> 32) + ) + if seq >= len(s.bs) { + return nil, errors.Errorf("reference sequence %d out of range", seq) + } + b := s.bs[seq] + + if int(off) >= len(b) { + return nil, errors.Errorf("offset %d beyond data size %d", off, len(b)) + } + b = b[off:] + + l, n := binary.Uvarint(b) + if n < 0 { + return nil, fmt.Errorf("reading chunk length failed") + } + b = b[n:] + enc := chunks.Encoding(b[0]) + + c, err := chunks.FromData(enc, b[1:1+l]) + if err != nil { + return nil, err + } + return c, nil +} diff --git a/chunks_test.go b/chunks_test.go new file mode 100644 index 000000000..3255ffe3e --- /dev/null +++ b/chunks_test.go @@ -0,0 +1,16 @@ +package tsdb + +import "github.com/fabxc/tsdb/chunks" + +type mockChunkReader struct { + chunk func(ref uint64) (chunks.Chunk, error) + close func() error +} + +func (cr *mockChunkReader) Chunk(ref uint64) (chunks.Chunk, error) { + return cr.chunk(ref) +} + +func (cr *mockChunkReader) Close() error { + return cr.close() +} diff --git a/writer.go b/index.go similarity index 55% rename from writer.go rename to index.go index 264052d32..23771f035 100644 --- a/writer.go +++ b/index.go @@ -3,6 +3,7 @@ package tsdb import ( "bufio" "encoding/binary" + "fmt" "hash" "hash/crc32" "io" @@ -12,220 +13,19 @@ import ( "strings" "github.com/coreos/etcd/pkg/fileutil" - "github.com/fabxc/tsdb/chunks" "github.com/fabxc/tsdb/labels" "github.com/pkg/errors" ) const ( - // MagicSeries 4 bytes at the head of series file. - MagicSeries = 0x85BD40DD - // MagicIndex 4 bytes at the head of an index file. MagicIndex = 0xBAAAD700 + + indexFormatV1 = 1 ) const compactionPageBytes = minSectorSize * 64 -// ChunkWriter serializes a time block of chunked series data. -type ChunkWriter interface { - // WriteChunks writes several chunks. The data field of the ChunkMetas - // must be populated. - // After returning successfully, the Ref fields in the ChunkMetas - // is set and can be used to retrieve the chunks from the written data. - WriteChunks(chunks ...ChunkMeta) error - - // Close writes any required finalization and closes the resources - // associated with the underlying writer. - Close() error -} - -// chunkWriter implements the ChunkWriter interface for the standard -// serialization format. -type chunkWriter struct { - dirFile *os.File - files []*os.File - wbuf *bufio.Writer - n int64 - crc32 hash.Hash - - segmentSize int64 -} - -const ( - defaultChunkSegmentSize = 512 * 1024 * 1024 - - chunksFormatV1 = 1 - indexFormatV1 = 1 -) - -func newChunkWriter(dir string) (*chunkWriter, error) { - if err := os.MkdirAll(dir, 0777); err != nil { - return nil, err - } - dirFile, err := fileutil.OpenDir(dir) - if err != nil { - return nil, err - } - cw := &chunkWriter{ - dirFile: dirFile, - n: 0, - crc32: crc32.New(crc32.MakeTable(crc32.Castagnoli)), - segmentSize: defaultChunkSegmentSize, - } - return cw, nil -} - -func (w *chunkWriter) tail() *os.File { - if len(w.files) == 0 { - return nil - } - return w.files[len(w.files)-1] -} - -// finalizeTail writes all pending data to the current tail file, -// truncates its size, and closes it. -func (w *chunkWriter) finalizeTail() error { - tf := w.tail() - if tf == nil { - return nil - } - - if err := w.wbuf.Flush(); err != nil { - return err - } - if err := fileutil.Fsync(tf); err != nil { - return err - } - // As the file was pre-allocated, we truncate any superfluous zero bytes. - off, err := tf.Seek(0, os.SEEK_CUR) - if err != nil { - return err - } - if err := tf.Truncate(off); err != nil { - return err - } - return tf.Close() -} - -func (w *chunkWriter) cut() error { - // Sync current tail to disk and close. - w.finalizeTail() - - p, _, err := nextSequenceFile(w.dirFile.Name(), "") - if err != nil { - return err - } - f, err := os.OpenFile(p, os.O_WRONLY|os.O_CREATE, 0666) - if err != nil { - return err - } - if err = fileutil.Preallocate(f, w.segmentSize, true); err != nil { - return err - } - if err = w.dirFile.Sync(); err != nil { - return err - } - - // Write header metadata for new file. - - metab := make([]byte, 8) - binary.BigEndian.PutUint32(metab[:4], MagicSeries) - metab[4] = chunksFormatV1 - - if _, err := f.Write(metab); err != nil { - return err - } - - w.files = append(w.files, f) - if w.wbuf != nil { - w.wbuf.Reset(f) - } else { - w.wbuf = bufio.NewWriterSize(f, 8*1024*1024) - } - w.n = 8 - - return nil -} - -func (w *chunkWriter) write(wr io.Writer, b []byte) error { - n, err := wr.Write(b) - w.n += int64(n) - return err -} - -func (w *chunkWriter) WriteChunks(chks ...ChunkMeta) error { - // Calculate maximum space we need and cut a new segment in case - // we don't fit into the current one. - maxLen := int64(binary.MaxVarintLen32) - for _, c := range chks { - maxLen += binary.MaxVarintLen32 + 1 - maxLen += int64(len(c.Chunk.Bytes())) - } - newsz := w.n + maxLen - - if w.wbuf == nil || w.n > w.segmentSize || newsz > w.segmentSize && maxLen <= w.segmentSize { - if err := w.cut(); err != nil { - return err - } - } - - // Write chunks sequentially and set the reference field in the ChunkMeta. - w.crc32.Reset() - wr := io.MultiWriter(w.crc32, w.wbuf) - - b := make([]byte, binary.MaxVarintLen32) - n := binary.PutUvarint(b, uint64(len(chks))) - - if err := w.write(wr, b[:n]); err != nil { - return err - } - seq := uint64(w.seq()) << 32 - - for i := range chks { - chk := &chks[i] - - chk.Ref = seq | uint64(w.n) - - n = binary.PutUvarint(b, uint64(len(chk.Chunk.Bytes()))) - - if err := w.write(wr, b[:n]); err != nil { - return err - } - if err := w.write(wr, []byte{byte(chk.Chunk.Encoding())}); err != nil { - return err - } - if err := w.write(wr, chk.Chunk.Bytes()); err != nil { - return err - } - chk.Chunk = nil - } - - if err := w.write(w.wbuf, w.crc32.Sum(nil)); err != nil { - return err - } - return nil -} - -func (w *chunkWriter) seq() int { - return len(w.files) - 1 -} - -func (w *chunkWriter) Close() error { - return w.finalizeTail() -} - -// ChunkMeta holds information about a chunk of data. -type ChunkMeta struct { - // Ref and Chunk hold either a reference that can be used to retrieve - // chunk data or the data itself. - // Generally, only one of them is set. - Ref uint64 - Chunk chunks.Chunk - - MinTime, MaxTime int64 // time range the data covers -} - // IndexWriter serialized the index for a block of series data. // The methods must generally be called in order they are specified. type IndexWriter interface { @@ -609,3 +409,368 @@ func (w *indexWriter) Close() error { } return w.f.Close() } + +// IndexReader provides reading access of serialized index data. +type IndexReader interface { + // LabelValues returns the possible label values + LabelValues(names ...string) (StringTuples, error) + + // Postings returns the postings list iterator for the label pair. + Postings(name, value string) (Postings, error) + + // Series returns the series for the given reference. + Series(ref uint32) (labels.Labels, []ChunkMeta, error) + + // LabelIndices returns the label pairs for which indices exist. + LabelIndices() ([][]string, error) + + // Close released the underlying resources of the reader. + Close() error +} + +// StringTuples provides access to a sorted list of string tuples. +type StringTuples interface { + // Total number of tuples in the list. + Len() int + // At returns the tuple at position i. + At(i int) ([]string, error) +} + +type indexReader struct { + // The underlying byte slice holding the encoded series data. + b []byte + + // Close that releases the underlying resources of the byte slice. + c io.Closer + + // Cached hashmaps of section offsets. + labels map[string]uint32 + postings map[string]uint32 +} + +var ( + errInvalidSize = fmt.Errorf("invalid size") + errInvalidFlag = fmt.Errorf("invalid flag") +) + +// newIndexReader returns a new indexReader on the given directory. +func newIndexReader(dir string) (*indexReader, error) { + f, err := openMmapFile(filepath.Join(dir, "index")) + if err != nil { + return nil, err + } + r := &indexReader{b: f.b, c: f} + + // Verify magic number. + if len(f.b) < 4 { + return nil, errors.Wrap(errInvalidSize, "index header") + } + if m := binary.BigEndian.Uint32(r.b[:4]); m != MagicIndex { + return nil, errors.Errorf("invalid magic number %x", m) + } + + // The last two 4 bytes hold the pointers to the hashmaps. + loff := binary.BigEndian.Uint32(r.b[len(r.b)-8 : len(r.b)-4]) + poff := binary.BigEndian.Uint32(r.b[len(r.b)-4:]) + + flag, b, err := r.section(loff) + if err != nil { + return nil, errors.Wrapf(err, "label index hashmap section at %d", loff) + } + if r.labels, err = readHashmap(flag, b); err != nil { + return nil, errors.Wrap(err, "read label index hashmap") + } + flag, b, err = r.section(poff) + if err != nil { + return nil, errors.Wrapf(err, "postings hashmap section at %d", loff) + } + if r.postings, err = readHashmap(flag, b); err != nil { + return nil, errors.Wrap(err, "read postings hashmap") + } + + return r, nil +} + +func readHashmap(flag byte, b []byte) (map[string]uint32, error) { + if flag != flagStd { + return nil, errInvalidFlag + } + h := make(map[string]uint32, 512) + + for len(b) > 0 { + l, n := binary.Uvarint(b) + if n < 1 { + return nil, errors.Wrap(errInvalidSize, "read key length") + } + b = b[n:] + + if len(b) < int(l) { + return nil, errors.Wrap(errInvalidSize, "read key") + } + s := string(b[:l]) + b = b[l:] + + o, n := binary.Uvarint(b) + if n < 1 { + return nil, errors.Wrap(errInvalidSize, "read offset value") + } + b = b[n:] + + h[s] = uint32(o) + } + + return h, nil +} + +func (r *indexReader) Close() error { + return r.c.Close() +} + +func (r *indexReader) section(o uint32) (byte, []byte, error) { + b := r.b[o:] + + if len(b) < 5 { + return 0, nil, errors.Wrap(errInvalidSize, "read header") + } + + flag := b[0] + l := binary.BigEndian.Uint32(b[1:5]) + + b = b[5:] + + // b must have the given length plus 4 bytes for the CRC32 checksum. + if len(b) < int(l)+4 { + return 0, nil, errors.Wrap(errInvalidSize, "section content") + } + return flag, b[:l], nil +} + +func (r *indexReader) lookupSymbol(o uint32) (string, error) { + if int(o) > len(r.b) { + return "", errors.Errorf("invalid symbol offset %d", o) + } + l, n := binary.Uvarint(r.b[o:]) + if n < 0 { + return "", errors.New("reading symbol length failed") + } + + end := int(o) + n + int(l) + if end > len(r.b) { + return "", errors.New("invalid length") + } + b := r.b[int(o)+n : end] + + return yoloString(b), nil +} + +func (r *indexReader) LabelValues(names ...string) (StringTuples, error) { + key := strings.Join(names, string(sep)) + off, ok := r.labels[key] + if !ok { + return nil, fmt.Errorf("label index doesn't exist") + } + + flag, b, err := r.section(off) + if err != nil { + return nil, errors.Wrapf(err, "section at %d", off) + } + if flag != flagStd { + return nil, errInvalidFlag + } + l, n := binary.Uvarint(b) + if n < 1 { + return nil, errors.Wrap(errInvalidSize, "read label index size") + } + + st := &serializedStringTuples{ + l: int(l), + b: b[n:], + lookup: r.lookupSymbol, + } + return st, nil +} + +func (r *indexReader) LabelIndices() ([][]string, error) { + res := [][]string{} + + for s := range r.labels { + res = append(res, strings.Split(s, string(sep))) + } + return res, nil +} + +func (r *indexReader) Series(ref uint32) (labels.Labels, []ChunkMeta, error) { + k, n := binary.Uvarint(r.b[ref:]) + if n < 1 { + return nil, nil, errors.Wrap(errInvalidSize, "number of labels") + } + + b := r.b[int(ref)+n:] + lbls := make(labels.Labels, 0, k) + + for i := 0; i < 2*int(k); i += 2 { + o, m := binary.Uvarint(b) + if m < 1 { + return nil, nil, errors.Wrap(errInvalidSize, "symbol offset") + } + n, err := r.lookupSymbol(uint32(o)) + if err != nil { + return nil, nil, errors.Wrap(err, "symbol lookup") + } + b = b[m:] + + o, m = binary.Uvarint(b) + if m < 1 { + return nil, nil, errors.Wrap(errInvalidSize, "symbol offset") + } + v, err := r.lookupSymbol(uint32(o)) + if err != nil { + return nil, nil, errors.Wrap(err, "symbol lookup") + } + b = b[m:] + + lbls = append(lbls, labels.Label{ + Name: n, + Value: v, + }) + } + + // Read the chunks meta data. + k, n = binary.Uvarint(b) + if n < 1 { + return nil, nil, errors.Wrap(errInvalidSize, "number of chunks") + } + + b = b[n:] + chunks := make([]ChunkMeta, 0, k) + + for i := 0; i < int(k); i++ { + firstTime, n := binary.Varint(b) + if n < 1 { + return nil, nil, errors.Wrap(errInvalidSize, "first time") + } + b = b[n:] + + lastTime, n := binary.Varint(b) + if n < 1 { + return nil, nil, errors.Wrap(errInvalidSize, "last time") + } + b = b[n:] + + o, n := binary.Uvarint(b) + if n < 1 { + return nil, nil, errors.Wrap(errInvalidSize, "chunk offset") + } + b = b[n:] + + chunks = append(chunks, ChunkMeta{ + Ref: o, + MinTime: firstTime, + MaxTime: lastTime, + }) + } + + return lbls, chunks, nil +} + +func (r *indexReader) Postings(name, value string) (Postings, error) { + key := name + string(sep) + value + + off, ok := r.postings[key] + if !ok { + return nil, ErrNotFound + } + + flag, b, err := r.section(off) + if err != nil { + return nil, errors.Wrapf(err, "section at %d", off) + } + + if flag != flagStd { + return nil, errors.Wrapf(errInvalidFlag, "section at %d", off) + } + + // TODO(fabxc): just read into memory as an intermediate solution. + // Add iterator over serialized data. + var l []uint32 + + for len(b) > 0 { + if len(b) < 4 { + return nil, errors.Wrap(errInvalidSize, "plain postings entry") + } + l = append(l, binary.BigEndian.Uint32(b[:4])) + + b = b[4:] + } + + return &listPostings{list: l, idx: -1}, nil +} + +type stringTuples struct { + l int // tuple length + s []string // flattened tuple entries +} + +func newStringTuples(s []string, l int) (*stringTuples, error) { + if len(s)%l != 0 { + return nil, errors.Wrap(errInvalidSize, "string tuple list") + } + return &stringTuples{s: s, l: l}, nil +} + +func (t *stringTuples) Len() int { return len(t.s) / t.l } +func (t *stringTuples) At(i int) ([]string, error) { return t.s[i : i+t.l], nil } + +func (t *stringTuples) Swap(i, j int) { + c := make([]string, t.l) + copy(c, t.s[i:i+t.l]) + + for k := 0; k < t.l; k++ { + t.s[i+k] = t.s[j+k] + t.s[j+k] = c[k] + } +} + +func (t *stringTuples) Less(i, j int) bool { + for k := 0; k < t.l; k++ { + d := strings.Compare(t.s[i+k], t.s[j+k]) + + if d < 0 { + return true + } + if d > 0 { + return false + } + } + return false +} + +type serializedStringTuples struct { + l int + b []byte + lookup func(uint32) (string, error) +} + +func (t *serializedStringTuples) Len() int { + // TODO(fabxc): Cache this? + return len(t.b) / (4 * t.l) +} + +func (t *serializedStringTuples) At(i int) ([]string, error) { + if len(t.b) < (i+t.l)*4 { + return nil, errInvalidSize + } + res := make([]string, 0, t.l) + + for k := 0; k < t.l; k++ { + offset := binary.BigEndian.Uint32(t.b[(i+k)*4:]) + + s, err := t.lookup(offset) + if err != nil { + return nil, errors.Wrap(err, "symbol lookup") + } + res = append(res, s) + } + + return res, nil +} diff --git a/rw_test.go b/index_test.go similarity index 90% rename from rw_test.go rename to index_test.go index 4f6dad169..b65ce58a0 100644 --- a/rw_test.go +++ b/index_test.go @@ -40,18 +40,6 @@ func (ir *mockIndexReader) Close() error { return ir.close() } -type mockChunkReader struct { - chunk func(ref uint64) (chunks.Chunk, error) - close func() error -} - -func (cr *mockChunkReader) Chunk(ref uint64) (chunks.Chunk, error) { - return cr.chunk(ref) -} - -func (cr *mockChunkReader) Close() error { - return cr.close() -} func TestPersistence_index_e2e(t *testing.T) { dir, err := ioutil.TempDir("", "test_persistence_e2e") @@ -152,8 +140,4 @@ func TestPersistence_index_e2e(t *testing.T) { require.NoError(t, ir.Close()) -} - -func BenchmarkPersistence_index_write(b *testing.B) { - -} +} \ No newline at end of file diff --git a/reader.go b/reader.go deleted file mode 100644 index d4d816c1f..000000000 --- a/reader.go +++ /dev/null @@ -1,459 +0,0 @@ -package tsdb - -import ( - "encoding/binary" - "fmt" - "io" - "path/filepath" - "strings" - - "github.com/fabxc/tsdb/chunks" - "github.com/fabxc/tsdb/labels" - "github.com/pkg/errors" -) - -// ChunkReader provides reading access of serialized time series data. -type ChunkReader interface { - // Chunk returns the series data chunk with the given reference. - Chunk(ref uint64) (chunks.Chunk, error) - - // Close releases all underlying resources of the reader. - Close() error -} - -// chunkReader implements a SeriesReader for a serialized byte stream -// of series data. -type chunkReader struct { - // The underlying bytes holding the encoded series data. - bs [][]byte - - // Closers for resources behind the byte slices. - cs []io.Closer -} - -// newChunkReader returns a new chunkReader based on mmaped files found in dir. -func newChunkReader(dir string) (*chunkReader, error) { - files, err := sequenceFiles(dir, "") - if err != nil { - return nil, err - } - var cr chunkReader - - for _, fn := range files { - f, err := openMmapFile(fn) - if err != nil { - return nil, errors.Wrapf(err, "mmap files") - } - cr.cs = append(cr.cs, f) - cr.bs = append(cr.bs, f.b) - } - - for i, b := range cr.bs { - if len(b) < 4 { - return nil, errors.Wrapf(errInvalidSize, "validate magic in segment %d", i) - } - // Verify magic number. - if m := binary.BigEndian.Uint32(b[:4]); m != MagicSeries { - return nil, fmt.Errorf("invalid magic number %x", m) - } - } - return &cr, nil -} - -func (s *chunkReader) Close() error { - return closeAll(s.cs...) -} - -func (s *chunkReader) Chunk(ref uint64) (chunks.Chunk, error) { - var ( - seq = int(ref >> 32) - off = int((ref << 32) >> 32) - ) - if seq >= len(s.bs) { - return nil, errors.Errorf("reference sequence %d out of range", seq) - } - b := s.bs[seq] - - if int(off) >= len(b) { - return nil, errors.Errorf("offset %d beyond data size %d", off, len(b)) - } - b = b[off:] - - l, n := binary.Uvarint(b) - if n < 0 { - return nil, fmt.Errorf("reading chunk length failed") - } - b = b[n:] - enc := chunks.Encoding(b[0]) - - c, err := chunks.FromData(enc, b[1:1+l]) - if err != nil { - return nil, err - } - return c, nil -} - -// IndexReader provides reading access of serialized index data. -type IndexReader interface { - // LabelValues returns the possible label values - LabelValues(names ...string) (StringTuples, error) - - // Postings returns the postings list iterator for the label pair. - Postings(name, value string) (Postings, error) - - // Series returns the series for the given reference. - Series(ref uint32) (labels.Labels, []ChunkMeta, error) - - // LabelIndices returns the label pairs for which indices exist. - LabelIndices() ([][]string, error) - - // Close released the underlying resources of the reader. - Close() error -} - -// StringTuples provides access to a sorted list of string tuples. -type StringTuples interface { - // Total number of tuples in the list. - Len() int - // At returns the tuple at position i. - At(i int) ([]string, error) -} - -type indexReader struct { - // The underlying byte slice holding the encoded series data. - b []byte - - // Close that releases the underlying resources of the byte slice. - c io.Closer - - // Cached hashmaps of section offsets. - labels map[string]uint32 - postings map[string]uint32 -} - -var ( - errInvalidSize = fmt.Errorf("invalid size") - errInvalidFlag = fmt.Errorf("invalid flag") -) - -// newIndexReader returns a new indexReader on the given directory. -func newIndexReader(dir string) (*indexReader, error) { - f, err := openMmapFile(filepath.Join(dir, "index")) - if err != nil { - return nil, err - } - r := &indexReader{b: f.b, c: f} - - // Verify magic number. - if len(f.b) < 4 { - return nil, errors.Wrap(errInvalidSize, "index header") - } - if m := binary.BigEndian.Uint32(r.b[:4]); m != MagicIndex { - return nil, errors.Errorf("invalid magic number %x", m) - } - - // The last two 4 bytes hold the pointers to the hashmaps. - loff := binary.BigEndian.Uint32(r.b[len(r.b)-8 : len(r.b)-4]) - poff := binary.BigEndian.Uint32(r.b[len(r.b)-4:]) - - flag, b, err := r.section(loff) - if err != nil { - return nil, errors.Wrapf(err, "label index hashmap section at %d", loff) - } - if r.labels, err = readHashmap(flag, b); err != nil { - return nil, errors.Wrap(err, "read label index hashmap") - } - flag, b, err = r.section(poff) - if err != nil { - return nil, errors.Wrapf(err, "postings hashmap section at %d", loff) - } - if r.postings, err = readHashmap(flag, b); err != nil { - return nil, errors.Wrap(err, "read postings hashmap") - } - - return r, nil -} - -func readHashmap(flag byte, b []byte) (map[string]uint32, error) { - if flag != flagStd { - return nil, errInvalidFlag - } - h := make(map[string]uint32, 512) - - for len(b) > 0 { - l, n := binary.Uvarint(b) - if n < 1 { - return nil, errors.Wrap(errInvalidSize, "read key length") - } - b = b[n:] - - if len(b) < int(l) { - return nil, errors.Wrap(errInvalidSize, "read key") - } - s := string(b[:l]) - b = b[l:] - - o, n := binary.Uvarint(b) - if n < 1 { - return nil, errors.Wrap(errInvalidSize, "read offset value") - } - b = b[n:] - - h[s] = uint32(o) - } - - return h, nil -} - -func (r *indexReader) Close() error { - return r.c.Close() -} - -func (r *indexReader) section(o uint32) (byte, []byte, error) { - b := r.b[o:] - - if len(b) < 5 { - return 0, nil, errors.Wrap(errInvalidSize, "read header") - } - - flag := b[0] - l := binary.BigEndian.Uint32(b[1:5]) - - b = b[5:] - - // b must have the given length plus 4 bytes for the CRC32 checksum. - if len(b) < int(l)+4 { - return 0, nil, errors.Wrap(errInvalidSize, "section content") - } - return flag, b[:l], nil -} - -func (r *indexReader) lookupSymbol(o uint32) (string, error) { - if int(o) > len(r.b) { - return "", errors.Errorf("invalid symbol offset %d", o) - } - l, n := binary.Uvarint(r.b[o:]) - if n < 0 { - return "", errors.New("reading symbol length failed") - } - - end := int(o) + n + int(l) - if end > len(r.b) { - return "", errors.New("invalid length") - } - b := r.b[int(o)+n : end] - - return yoloString(b), nil -} - -func (r *indexReader) LabelValues(names ...string) (StringTuples, error) { - key := strings.Join(names, string(sep)) - off, ok := r.labels[key] - if !ok { - return nil, fmt.Errorf("label index doesn't exist") - } - - flag, b, err := r.section(off) - if err != nil { - return nil, errors.Wrapf(err, "section at %d", off) - } - if flag != flagStd { - return nil, errInvalidFlag - } - l, n := binary.Uvarint(b) - if n < 1 { - return nil, errors.Wrap(errInvalidSize, "read label index size") - } - - st := &serializedStringTuples{ - l: int(l), - b: b[n:], - lookup: r.lookupSymbol, - } - return st, nil -} - -func (r *indexReader) LabelIndices() ([][]string, error) { - res := [][]string{} - - for s := range r.labels { - res = append(res, strings.Split(s, string(sep))) - } - return res, nil -} - -func (r *indexReader) Series(ref uint32) (labels.Labels, []ChunkMeta, error) { - k, n := binary.Uvarint(r.b[ref:]) - if n < 1 { - return nil, nil, errors.Wrap(errInvalidSize, "number of labels") - } - - b := r.b[int(ref)+n:] - lbls := make(labels.Labels, 0, k) - - for i := 0; i < 2*int(k); i += 2 { - o, m := binary.Uvarint(b) - if m < 1 { - return nil, nil, errors.Wrap(errInvalidSize, "symbol offset") - } - n, err := r.lookupSymbol(uint32(o)) - if err != nil { - return nil, nil, errors.Wrap(err, "symbol lookup") - } - b = b[m:] - - o, m = binary.Uvarint(b) - if m < 1 { - return nil, nil, errors.Wrap(errInvalidSize, "symbol offset") - } - v, err := r.lookupSymbol(uint32(o)) - if err != nil { - return nil, nil, errors.Wrap(err, "symbol lookup") - } - b = b[m:] - - lbls = append(lbls, labels.Label{ - Name: n, - Value: v, - }) - } - - // Read the chunks meta data. - k, n = binary.Uvarint(b) - if n < 1 { - return nil, nil, errors.Wrap(errInvalidSize, "number of chunks") - } - - b = b[n:] - chunks := make([]ChunkMeta, 0, k) - - for i := 0; i < int(k); i++ { - firstTime, n := binary.Varint(b) - if n < 1 { - return nil, nil, errors.Wrap(errInvalidSize, "first time") - } - b = b[n:] - - lastTime, n := binary.Varint(b) - if n < 1 { - return nil, nil, errors.Wrap(errInvalidSize, "last time") - } - b = b[n:] - - o, n := binary.Uvarint(b) - if n < 1 { - return nil, nil, errors.Wrap(errInvalidSize, "chunk offset") - } - b = b[n:] - - chunks = append(chunks, ChunkMeta{ - Ref: o, - MinTime: firstTime, - MaxTime: lastTime, - }) - } - - return lbls, chunks, nil -} - -func (r *indexReader) Postings(name, value string) (Postings, error) { - key := name + string(sep) + value - - off, ok := r.postings[key] - if !ok { - return nil, ErrNotFound - } - - flag, b, err := r.section(off) - if err != nil { - return nil, errors.Wrapf(err, "section at %d", off) - } - - if flag != flagStd { - return nil, errors.Wrapf(errInvalidFlag, "section at %d", off) - } - - // TODO(fabxc): just read into memory as an intermediate solution. - // Add iterator over serialized data. - var l []uint32 - - for len(b) > 0 { - if len(b) < 4 { - return nil, errors.Wrap(errInvalidSize, "plain postings entry") - } - l = append(l, binary.BigEndian.Uint32(b[:4])) - - b = b[4:] - } - - return &listPostings{list: l, idx: -1}, nil -} - -type stringTuples struct { - l int // tuple length - s []string // flattened tuple entries -} - -func newStringTuples(s []string, l int) (*stringTuples, error) { - if len(s)%l != 0 { - return nil, errors.Wrap(errInvalidSize, "string tuple list") - } - return &stringTuples{s: s, l: l}, nil -} - -func (t *stringTuples) Len() int { return len(t.s) / t.l } -func (t *stringTuples) At(i int) ([]string, error) { return t.s[i : i+t.l], nil } - -func (t *stringTuples) Swap(i, j int) { - c := make([]string, t.l) - copy(c, t.s[i:i+t.l]) - - for k := 0; k < t.l; k++ { - t.s[i+k] = t.s[j+k] - t.s[j+k] = c[k] - } -} - -func (t *stringTuples) Less(i, j int) bool { - for k := 0; k < t.l; k++ { - d := strings.Compare(t.s[i+k], t.s[j+k]) - - if d < 0 { - return true - } - if d > 0 { - return false - } - } - return false -} - -type serializedStringTuples struct { - l int - b []byte - lookup func(uint32) (string, error) -} - -func (t *serializedStringTuples) Len() int { - // TODO(fabxc): Cache this? - return len(t.b) / (4 * t.l) -} - -func (t *serializedStringTuples) At(i int) ([]string, error) { - if len(t.b) < (i+t.l)*4 { - return nil, errInvalidSize - } - res := make([]string, 0, t.l) - - for k := 0; k < t.l; k++ { - offset := binary.BigEndian.Uint32(t.b[(i+k)*4:]) - - s, err := t.lookup(offset) - if err != nil { - return nil, errors.Wrap(err, "symbol lookup") - } - res = append(res, s) - } - - return res, nil -}