mirror of https://github.com/prometheus/prometheus
Split persistence by chunk/index instead of read/write
parent
3910b913bd
commit
8a7addfc44
|
@ -0,0 +1,299 @@
|
||||||
|
package tsdb
|
||||||
|
|
||||||
|
import (
|
||||||
|
"bufio"
|
||||||
|
"encoding/binary"
|
||||||
|
"fmt"
|
||||||
|
"hash"
|
||||||
|
"hash/crc32"
|
||||||
|
"io"
|
||||||
|
"os"
|
||||||
|
|
||||||
|
"github.com/coreos/etcd/pkg/fileutil"
|
||||||
|
"github.com/fabxc/tsdb/chunks"
|
||||||
|
"github.com/pkg/errors"
|
||||||
|
)
|
||||||
|
|
||||||
|
const (
|
||||||
|
// MagicSeries 4 bytes at the head of series file.
|
||||||
|
MagicChunks = 0x85BD40DD
|
||||||
|
)
|
||||||
|
|
||||||
|
// ChunkMeta holds information about a chunk of data.
|
||||||
|
type ChunkMeta struct {
|
||||||
|
// Ref and Chunk hold either a reference that can be used to retrieve
|
||||||
|
// chunk data or the data itself.
|
||||||
|
// Generally, only one of them is set.
|
||||||
|
Ref uint64
|
||||||
|
Chunk chunks.Chunk
|
||||||
|
|
||||||
|
MinTime, MaxTime int64 // time range the data covers
|
||||||
|
}
|
||||||
|
|
||||||
|
// ChunkWriter serializes a time block of chunked series data.
|
||||||
|
type ChunkWriter interface {
|
||||||
|
// WriteChunks writes several chunks. The data field of the ChunkMetas
|
||||||
|
// must be populated.
|
||||||
|
// After returning successfully, the Ref fields in the ChunkMetas
|
||||||
|
// is set and can be used to retrieve the chunks from the written data.
|
||||||
|
WriteChunks(chunks ...ChunkMeta) error
|
||||||
|
|
||||||
|
// Close writes any required finalization and closes the resources
|
||||||
|
// associated with the underlying writer.
|
||||||
|
Close() error
|
||||||
|
}
|
||||||
|
|
||||||
|
// chunkWriter implements the ChunkWriter interface for the standard
|
||||||
|
// serialization format.
|
||||||
|
type chunkWriter struct {
|
||||||
|
dirFile *os.File
|
||||||
|
files []*os.File
|
||||||
|
wbuf *bufio.Writer
|
||||||
|
n int64
|
||||||
|
crc32 hash.Hash
|
||||||
|
|
||||||
|
segmentSize int64
|
||||||
|
}
|
||||||
|
|
||||||
|
const (
|
||||||
|
defaultChunkSegmentSize = 512 * 1024 * 1024
|
||||||
|
|
||||||
|
chunksFormatV1 = 1
|
||||||
|
)
|
||||||
|
|
||||||
|
func newChunkWriter(dir string) (*chunkWriter, error) {
|
||||||
|
if err := os.MkdirAll(dir, 0777); err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
dirFile, err := fileutil.OpenDir(dir)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
cw := &chunkWriter{
|
||||||
|
dirFile: dirFile,
|
||||||
|
n: 0,
|
||||||
|
crc32: crc32.New(crc32.MakeTable(crc32.Castagnoli)),
|
||||||
|
segmentSize: defaultChunkSegmentSize,
|
||||||
|
}
|
||||||
|
return cw, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (w *chunkWriter) tail() *os.File {
|
||||||
|
if len(w.files) == 0 {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
return w.files[len(w.files)-1]
|
||||||
|
}
|
||||||
|
|
||||||
|
// finalizeTail writes all pending data to the current tail file,
|
||||||
|
// truncates its size, and closes it.
|
||||||
|
func (w *chunkWriter) finalizeTail() error {
|
||||||
|
tf := w.tail()
|
||||||
|
if tf == nil {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
if err := w.wbuf.Flush(); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
if err := fileutil.Fsync(tf); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
// As the file was pre-allocated, we truncate any superfluous zero bytes.
|
||||||
|
off, err := tf.Seek(0, os.SEEK_CUR)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
if err := tf.Truncate(off); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
return tf.Close()
|
||||||
|
}
|
||||||
|
|
||||||
|
func (w *chunkWriter) cut() error {
|
||||||
|
// Sync current tail to disk and close.
|
||||||
|
w.finalizeTail()
|
||||||
|
|
||||||
|
p, _, err := nextSequenceFile(w.dirFile.Name(), "")
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
f, err := os.OpenFile(p, os.O_WRONLY|os.O_CREATE, 0666)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
if err = fileutil.Preallocate(f, w.segmentSize, true); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
if err = w.dirFile.Sync(); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
// Write header metadata for new file.
|
||||||
|
|
||||||
|
metab := make([]byte, 8)
|
||||||
|
binary.BigEndian.PutUint32(metab[:4], MagicChunks)
|
||||||
|
metab[4] = chunksFormatV1
|
||||||
|
|
||||||
|
if _, err := f.Write(metab); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
w.files = append(w.files, f)
|
||||||
|
if w.wbuf != nil {
|
||||||
|
w.wbuf.Reset(f)
|
||||||
|
} else {
|
||||||
|
w.wbuf = bufio.NewWriterSize(f, 8*1024*1024)
|
||||||
|
}
|
||||||
|
w.n = 8
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (w *chunkWriter) write(wr io.Writer, b []byte) error {
|
||||||
|
n, err := wr.Write(b)
|
||||||
|
w.n += int64(n)
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
func (w *chunkWriter) WriteChunks(chks ...ChunkMeta) error {
|
||||||
|
// Calculate maximum space we need and cut a new segment in case
|
||||||
|
// we don't fit into the current one.
|
||||||
|
maxLen := int64(binary.MaxVarintLen32)
|
||||||
|
for _, c := range chks {
|
||||||
|
maxLen += binary.MaxVarintLen32 + 1
|
||||||
|
maxLen += int64(len(c.Chunk.Bytes()))
|
||||||
|
}
|
||||||
|
newsz := w.n + maxLen
|
||||||
|
|
||||||
|
if w.wbuf == nil || w.n > w.segmentSize || newsz > w.segmentSize && maxLen <= w.segmentSize {
|
||||||
|
if err := w.cut(); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Write chunks sequentially and set the reference field in the ChunkMeta.
|
||||||
|
w.crc32.Reset()
|
||||||
|
wr := io.MultiWriter(w.crc32, w.wbuf)
|
||||||
|
|
||||||
|
b := make([]byte, binary.MaxVarintLen32)
|
||||||
|
n := binary.PutUvarint(b, uint64(len(chks)))
|
||||||
|
|
||||||
|
if err := w.write(wr, b[:n]); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
seq := uint64(w.seq()) << 32
|
||||||
|
|
||||||
|
for i := range chks {
|
||||||
|
chk := &chks[i]
|
||||||
|
|
||||||
|
chk.Ref = seq | uint64(w.n)
|
||||||
|
|
||||||
|
n = binary.PutUvarint(b, uint64(len(chk.Chunk.Bytes())))
|
||||||
|
|
||||||
|
if err := w.write(wr, b[:n]); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
if err := w.write(wr, []byte{byte(chk.Chunk.Encoding())}); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
if err := w.write(wr, chk.Chunk.Bytes()); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
chk.Chunk = nil
|
||||||
|
}
|
||||||
|
|
||||||
|
if err := w.write(w.wbuf, w.crc32.Sum(nil)); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (w *chunkWriter) seq() int {
|
||||||
|
return len(w.files) - 1
|
||||||
|
}
|
||||||
|
|
||||||
|
func (w *chunkWriter) Close() error {
|
||||||
|
return w.finalizeTail()
|
||||||
|
}
|
||||||
|
|
||||||
|
// ChunkReader provides reading access of serialized time series data.
|
||||||
|
type ChunkReader interface {
|
||||||
|
// Chunk returns the series data chunk with the given reference.
|
||||||
|
Chunk(ref uint64) (chunks.Chunk, error)
|
||||||
|
|
||||||
|
// Close releases all underlying resources of the reader.
|
||||||
|
Close() error
|
||||||
|
}
|
||||||
|
|
||||||
|
// chunkReader implements a SeriesReader for a serialized byte stream
|
||||||
|
// of series data.
|
||||||
|
type chunkReader struct {
|
||||||
|
// The underlying bytes holding the encoded series data.
|
||||||
|
bs [][]byte
|
||||||
|
|
||||||
|
// Closers for resources behind the byte slices.
|
||||||
|
cs []io.Closer
|
||||||
|
}
|
||||||
|
|
||||||
|
// newChunkReader returns a new chunkReader based on mmaped files found in dir.
|
||||||
|
func newChunkReader(dir string) (*chunkReader, error) {
|
||||||
|
files, err := sequenceFiles(dir, "")
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
var cr chunkReader
|
||||||
|
|
||||||
|
for _, fn := range files {
|
||||||
|
f, err := openMmapFile(fn)
|
||||||
|
if err != nil {
|
||||||
|
return nil, errors.Wrapf(err, "mmap files")
|
||||||
|
}
|
||||||
|
cr.cs = append(cr.cs, f)
|
||||||
|
cr.bs = append(cr.bs, f.b)
|
||||||
|
}
|
||||||
|
|
||||||
|
for i, b := range cr.bs {
|
||||||
|
if len(b) < 4 {
|
||||||
|
return nil, errors.Wrapf(errInvalidSize, "validate magic in segment %d", i)
|
||||||
|
}
|
||||||
|
// Verify magic number.
|
||||||
|
if m := binary.BigEndian.Uint32(b[:4]); m != MagicChunks {
|
||||||
|
return nil, fmt.Errorf("invalid magic number %x", m)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return &cr, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *chunkReader) Close() error {
|
||||||
|
return closeAll(s.cs...)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *chunkReader) Chunk(ref uint64) (chunks.Chunk, error) {
|
||||||
|
var (
|
||||||
|
seq = int(ref >> 32)
|
||||||
|
off = int((ref << 32) >> 32)
|
||||||
|
)
|
||||||
|
if seq >= len(s.bs) {
|
||||||
|
return nil, errors.Errorf("reference sequence %d out of range", seq)
|
||||||
|
}
|
||||||
|
b := s.bs[seq]
|
||||||
|
|
||||||
|
if int(off) >= len(b) {
|
||||||
|
return nil, errors.Errorf("offset %d beyond data size %d", off, len(b))
|
||||||
|
}
|
||||||
|
b = b[off:]
|
||||||
|
|
||||||
|
l, n := binary.Uvarint(b)
|
||||||
|
if n < 0 {
|
||||||
|
return nil, fmt.Errorf("reading chunk length failed")
|
||||||
|
}
|
||||||
|
b = b[n:]
|
||||||
|
enc := chunks.Encoding(b[0])
|
||||||
|
|
||||||
|
c, err := chunks.FromData(enc, b[1:1+l])
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
return c, nil
|
||||||
|
}
|
|
@ -0,0 +1,16 @@
|
||||||
|
package tsdb
|
||||||
|
|
||||||
|
import "github.com/fabxc/tsdb/chunks"
|
||||||
|
|
||||||
|
type mockChunkReader struct {
|
||||||
|
chunk func(ref uint64) (chunks.Chunk, error)
|
||||||
|
close func() error
|
||||||
|
}
|
||||||
|
|
||||||
|
func (cr *mockChunkReader) Chunk(ref uint64) (chunks.Chunk, error) {
|
||||||
|
return cr.chunk(ref)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (cr *mockChunkReader) Close() error {
|
||||||
|
return cr.close()
|
||||||
|
}
|
|
@ -3,6 +3,7 @@ package tsdb
|
||||||
import (
|
import (
|
||||||
"bufio"
|
"bufio"
|
||||||
"encoding/binary"
|
"encoding/binary"
|
||||||
|
"fmt"
|
||||||
"hash"
|
"hash"
|
||||||
"hash/crc32"
|
"hash/crc32"
|
||||||
"io"
|
"io"
|
||||||
|
@ -12,219 +13,18 @@ import (
|
||||||
"strings"
|
"strings"
|
||||||
|
|
||||||
"github.com/coreos/etcd/pkg/fileutil"
|
"github.com/coreos/etcd/pkg/fileutil"
|
||||||
"github.com/fabxc/tsdb/chunks"
|
|
||||||
"github.com/fabxc/tsdb/labels"
|
"github.com/fabxc/tsdb/labels"
|
||||||
"github.com/pkg/errors"
|
"github.com/pkg/errors"
|
||||||
)
|
)
|
||||||
|
|
||||||
const (
|
const (
|
||||||
// MagicSeries 4 bytes at the head of series file.
|
|
||||||
MagicSeries = 0x85BD40DD
|
|
||||||
|
|
||||||
// MagicIndex 4 bytes at the head of an index file.
|
// MagicIndex 4 bytes at the head of an index file.
|
||||||
MagicIndex = 0xBAAAD700
|
MagicIndex = 0xBAAAD700
|
||||||
)
|
|
||||||
|
|
||||||
const compactionPageBytes = minSectorSize * 64
|
|
||||||
|
|
||||||
// ChunkWriter serializes a time block of chunked series data.
|
|
||||||
type ChunkWriter interface {
|
|
||||||
// WriteChunks writes several chunks. The data field of the ChunkMetas
|
|
||||||
// must be populated.
|
|
||||||
// After returning successfully, the Ref fields in the ChunkMetas
|
|
||||||
// is set and can be used to retrieve the chunks from the written data.
|
|
||||||
WriteChunks(chunks ...ChunkMeta) error
|
|
||||||
|
|
||||||
// Close writes any required finalization and closes the resources
|
|
||||||
// associated with the underlying writer.
|
|
||||||
Close() error
|
|
||||||
}
|
|
||||||
|
|
||||||
// chunkWriter implements the ChunkWriter interface for the standard
|
|
||||||
// serialization format.
|
|
||||||
type chunkWriter struct {
|
|
||||||
dirFile *os.File
|
|
||||||
files []*os.File
|
|
||||||
wbuf *bufio.Writer
|
|
||||||
n int64
|
|
||||||
crc32 hash.Hash
|
|
||||||
|
|
||||||
segmentSize int64
|
|
||||||
}
|
|
||||||
|
|
||||||
const (
|
|
||||||
defaultChunkSegmentSize = 512 * 1024 * 1024
|
|
||||||
|
|
||||||
chunksFormatV1 = 1
|
|
||||||
indexFormatV1 = 1
|
indexFormatV1 = 1
|
||||||
)
|
)
|
||||||
|
|
||||||
func newChunkWriter(dir string) (*chunkWriter, error) {
|
const compactionPageBytes = minSectorSize * 64
|
||||||
if err := os.MkdirAll(dir, 0777); err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
dirFile, err := fileutil.OpenDir(dir)
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
cw := &chunkWriter{
|
|
||||||
dirFile: dirFile,
|
|
||||||
n: 0,
|
|
||||||
crc32: crc32.New(crc32.MakeTable(crc32.Castagnoli)),
|
|
||||||
segmentSize: defaultChunkSegmentSize,
|
|
||||||
}
|
|
||||||
return cw, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func (w *chunkWriter) tail() *os.File {
|
|
||||||
if len(w.files) == 0 {
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
return w.files[len(w.files)-1]
|
|
||||||
}
|
|
||||||
|
|
||||||
// finalizeTail writes all pending data to the current tail file,
|
|
||||||
// truncates its size, and closes it.
|
|
||||||
func (w *chunkWriter) finalizeTail() error {
|
|
||||||
tf := w.tail()
|
|
||||||
if tf == nil {
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
if err := w.wbuf.Flush(); err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
if err := fileutil.Fsync(tf); err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
// As the file was pre-allocated, we truncate any superfluous zero bytes.
|
|
||||||
off, err := tf.Seek(0, os.SEEK_CUR)
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
if err := tf.Truncate(off); err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
return tf.Close()
|
|
||||||
}
|
|
||||||
|
|
||||||
func (w *chunkWriter) cut() error {
|
|
||||||
// Sync current tail to disk and close.
|
|
||||||
w.finalizeTail()
|
|
||||||
|
|
||||||
p, _, err := nextSequenceFile(w.dirFile.Name(), "")
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
f, err := os.OpenFile(p, os.O_WRONLY|os.O_CREATE, 0666)
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
if err = fileutil.Preallocate(f, w.segmentSize, true); err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
if err = w.dirFile.Sync(); err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
// Write header metadata for new file.
|
|
||||||
|
|
||||||
metab := make([]byte, 8)
|
|
||||||
binary.BigEndian.PutUint32(metab[:4], MagicSeries)
|
|
||||||
metab[4] = chunksFormatV1
|
|
||||||
|
|
||||||
if _, err := f.Write(metab); err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
w.files = append(w.files, f)
|
|
||||||
if w.wbuf != nil {
|
|
||||||
w.wbuf.Reset(f)
|
|
||||||
} else {
|
|
||||||
w.wbuf = bufio.NewWriterSize(f, 8*1024*1024)
|
|
||||||
}
|
|
||||||
w.n = 8
|
|
||||||
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func (w *chunkWriter) write(wr io.Writer, b []byte) error {
|
|
||||||
n, err := wr.Write(b)
|
|
||||||
w.n += int64(n)
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
func (w *chunkWriter) WriteChunks(chks ...ChunkMeta) error {
|
|
||||||
// Calculate maximum space we need and cut a new segment in case
|
|
||||||
// we don't fit into the current one.
|
|
||||||
maxLen := int64(binary.MaxVarintLen32)
|
|
||||||
for _, c := range chks {
|
|
||||||
maxLen += binary.MaxVarintLen32 + 1
|
|
||||||
maxLen += int64(len(c.Chunk.Bytes()))
|
|
||||||
}
|
|
||||||
newsz := w.n + maxLen
|
|
||||||
|
|
||||||
if w.wbuf == nil || w.n > w.segmentSize || newsz > w.segmentSize && maxLen <= w.segmentSize {
|
|
||||||
if err := w.cut(); err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// Write chunks sequentially and set the reference field in the ChunkMeta.
|
|
||||||
w.crc32.Reset()
|
|
||||||
wr := io.MultiWriter(w.crc32, w.wbuf)
|
|
||||||
|
|
||||||
b := make([]byte, binary.MaxVarintLen32)
|
|
||||||
n := binary.PutUvarint(b, uint64(len(chks)))
|
|
||||||
|
|
||||||
if err := w.write(wr, b[:n]); err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
seq := uint64(w.seq()) << 32
|
|
||||||
|
|
||||||
for i := range chks {
|
|
||||||
chk := &chks[i]
|
|
||||||
|
|
||||||
chk.Ref = seq | uint64(w.n)
|
|
||||||
|
|
||||||
n = binary.PutUvarint(b, uint64(len(chk.Chunk.Bytes())))
|
|
||||||
|
|
||||||
if err := w.write(wr, b[:n]); err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
if err := w.write(wr, []byte{byte(chk.Chunk.Encoding())}); err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
if err := w.write(wr, chk.Chunk.Bytes()); err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
chk.Chunk = nil
|
|
||||||
}
|
|
||||||
|
|
||||||
if err := w.write(w.wbuf, w.crc32.Sum(nil)); err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func (w *chunkWriter) seq() int {
|
|
||||||
return len(w.files) - 1
|
|
||||||
}
|
|
||||||
|
|
||||||
func (w *chunkWriter) Close() error {
|
|
||||||
return w.finalizeTail()
|
|
||||||
}
|
|
||||||
|
|
||||||
// ChunkMeta holds information about a chunk of data.
|
|
||||||
type ChunkMeta struct {
|
|
||||||
// Ref and Chunk hold either a reference that can be used to retrieve
|
|
||||||
// chunk data or the data itself.
|
|
||||||
// Generally, only one of them is set.
|
|
||||||
Ref uint64
|
|
||||||
Chunk chunks.Chunk
|
|
||||||
|
|
||||||
MinTime, MaxTime int64 // time range the data covers
|
|
||||||
}
|
|
||||||
|
|
||||||
// IndexWriter serialized the index for a block of series data.
|
// IndexWriter serialized the index for a block of series data.
|
||||||
// The methods must generally be called in order they are specified.
|
// The methods must generally be called in order they are specified.
|
||||||
|
@ -609,3 +409,368 @@ func (w *indexWriter) Close() error {
|
||||||
}
|
}
|
||||||
return w.f.Close()
|
return w.f.Close()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// IndexReader provides reading access of serialized index data.
|
||||||
|
type IndexReader interface {
|
||||||
|
// LabelValues returns the possible label values
|
||||||
|
LabelValues(names ...string) (StringTuples, error)
|
||||||
|
|
||||||
|
// Postings returns the postings list iterator for the label pair.
|
||||||
|
Postings(name, value string) (Postings, error)
|
||||||
|
|
||||||
|
// Series returns the series for the given reference.
|
||||||
|
Series(ref uint32) (labels.Labels, []ChunkMeta, error)
|
||||||
|
|
||||||
|
// LabelIndices returns the label pairs for which indices exist.
|
||||||
|
LabelIndices() ([][]string, error)
|
||||||
|
|
||||||
|
// Close released the underlying resources of the reader.
|
||||||
|
Close() error
|
||||||
|
}
|
||||||
|
|
||||||
|
// StringTuples provides access to a sorted list of string tuples.
|
||||||
|
type StringTuples interface {
|
||||||
|
// Total number of tuples in the list.
|
||||||
|
Len() int
|
||||||
|
// At returns the tuple at position i.
|
||||||
|
At(i int) ([]string, error)
|
||||||
|
}
|
||||||
|
|
||||||
|
type indexReader struct {
|
||||||
|
// The underlying byte slice holding the encoded series data.
|
||||||
|
b []byte
|
||||||
|
|
||||||
|
// Close that releases the underlying resources of the byte slice.
|
||||||
|
c io.Closer
|
||||||
|
|
||||||
|
// Cached hashmaps of section offsets.
|
||||||
|
labels map[string]uint32
|
||||||
|
postings map[string]uint32
|
||||||
|
}
|
||||||
|
|
||||||
|
var (
|
||||||
|
errInvalidSize = fmt.Errorf("invalid size")
|
||||||
|
errInvalidFlag = fmt.Errorf("invalid flag")
|
||||||
|
)
|
||||||
|
|
||||||
|
// newIndexReader returns a new indexReader on the given directory.
|
||||||
|
func newIndexReader(dir string) (*indexReader, error) {
|
||||||
|
f, err := openMmapFile(filepath.Join(dir, "index"))
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
r := &indexReader{b: f.b, c: f}
|
||||||
|
|
||||||
|
// Verify magic number.
|
||||||
|
if len(f.b) < 4 {
|
||||||
|
return nil, errors.Wrap(errInvalidSize, "index header")
|
||||||
|
}
|
||||||
|
if m := binary.BigEndian.Uint32(r.b[:4]); m != MagicIndex {
|
||||||
|
return nil, errors.Errorf("invalid magic number %x", m)
|
||||||
|
}
|
||||||
|
|
||||||
|
// The last two 4 bytes hold the pointers to the hashmaps.
|
||||||
|
loff := binary.BigEndian.Uint32(r.b[len(r.b)-8 : len(r.b)-4])
|
||||||
|
poff := binary.BigEndian.Uint32(r.b[len(r.b)-4:])
|
||||||
|
|
||||||
|
flag, b, err := r.section(loff)
|
||||||
|
if err != nil {
|
||||||
|
return nil, errors.Wrapf(err, "label index hashmap section at %d", loff)
|
||||||
|
}
|
||||||
|
if r.labels, err = readHashmap(flag, b); err != nil {
|
||||||
|
return nil, errors.Wrap(err, "read label index hashmap")
|
||||||
|
}
|
||||||
|
flag, b, err = r.section(poff)
|
||||||
|
if err != nil {
|
||||||
|
return nil, errors.Wrapf(err, "postings hashmap section at %d", loff)
|
||||||
|
}
|
||||||
|
if r.postings, err = readHashmap(flag, b); err != nil {
|
||||||
|
return nil, errors.Wrap(err, "read postings hashmap")
|
||||||
|
}
|
||||||
|
|
||||||
|
return r, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func readHashmap(flag byte, b []byte) (map[string]uint32, error) {
|
||||||
|
if flag != flagStd {
|
||||||
|
return nil, errInvalidFlag
|
||||||
|
}
|
||||||
|
h := make(map[string]uint32, 512)
|
||||||
|
|
||||||
|
for len(b) > 0 {
|
||||||
|
l, n := binary.Uvarint(b)
|
||||||
|
if n < 1 {
|
||||||
|
return nil, errors.Wrap(errInvalidSize, "read key length")
|
||||||
|
}
|
||||||
|
b = b[n:]
|
||||||
|
|
||||||
|
if len(b) < int(l) {
|
||||||
|
return nil, errors.Wrap(errInvalidSize, "read key")
|
||||||
|
}
|
||||||
|
s := string(b[:l])
|
||||||
|
b = b[l:]
|
||||||
|
|
||||||
|
o, n := binary.Uvarint(b)
|
||||||
|
if n < 1 {
|
||||||
|
return nil, errors.Wrap(errInvalidSize, "read offset value")
|
||||||
|
}
|
||||||
|
b = b[n:]
|
||||||
|
|
||||||
|
h[s] = uint32(o)
|
||||||
|
}
|
||||||
|
|
||||||
|
return h, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (r *indexReader) Close() error {
|
||||||
|
return r.c.Close()
|
||||||
|
}
|
||||||
|
|
||||||
|
func (r *indexReader) section(o uint32) (byte, []byte, error) {
|
||||||
|
b := r.b[o:]
|
||||||
|
|
||||||
|
if len(b) < 5 {
|
||||||
|
return 0, nil, errors.Wrap(errInvalidSize, "read header")
|
||||||
|
}
|
||||||
|
|
||||||
|
flag := b[0]
|
||||||
|
l := binary.BigEndian.Uint32(b[1:5])
|
||||||
|
|
||||||
|
b = b[5:]
|
||||||
|
|
||||||
|
// b must have the given length plus 4 bytes for the CRC32 checksum.
|
||||||
|
if len(b) < int(l)+4 {
|
||||||
|
return 0, nil, errors.Wrap(errInvalidSize, "section content")
|
||||||
|
}
|
||||||
|
return flag, b[:l], nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (r *indexReader) lookupSymbol(o uint32) (string, error) {
|
||||||
|
if int(o) > len(r.b) {
|
||||||
|
return "", errors.Errorf("invalid symbol offset %d", o)
|
||||||
|
}
|
||||||
|
l, n := binary.Uvarint(r.b[o:])
|
||||||
|
if n < 0 {
|
||||||
|
return "", errors.New("reading symbol length failed")
|
||||||
|
}
|
||||||
|
|
||||||
|
end := int(o) + n + int(l)
|
||||||
|
if end > len(r.b) {
|
||||||
|
return "", errors.New("invalid length")
|
||||||
|
}
|
||||||
|
b := r.b[int(o)+n : end]
|
||||||
|
|
||||||
|
return yoloString(b), nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (r *indexReader) LabelValues(names ...string) (StringTuples, error) {
|
||||||
|
key := strings.Join(names, string(sep))
|
||||||
|
off, ok := r.labels[key]
|
||||||
|
if !ok {
|
||||||
|
return nil, fmt.Errorf("label index doesn't exist")
|
||||||
|
}
|
||||||
|
|
||||||
|
flag, b, err := r.section(off)
|
||||||
|
if err != nil {
|
||||||
|
return nil, errors.Wrapf(err, "section at %d", off)
|
||||||
|
}
|
||||||
|
if flag != flagStd {
|
||||||
|
return nil, errInvalidFlag
|
||||||
|
}
|
||||||
|
l, n := binary.Uvarint(b)
|
||||||
|
if n < 1 {
|
||||||
|
return nil, errors.Wrap(errInvalidSize, "read label index size")
|
||||||
|
}
|
||||||
|
|
||||||
|
st := &serializedStringTuples{
|
||||||
|
l: int(l),
|
||||||
|
b: b[n:],
|
||||||
|
lookup: r.lookupSymbol,
|
||||||
|
}
|
||||||
|
return st, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (r *indexReader) LabelIndices() ([][]string, error) {
|
||||||
|
res := [][]string{}
|
||||||
|
|
||||||
|
for s := range r.labels {
|
||||||
|
res = append(res, strings.Split(s, string(sep)))
|
||||||
|
}
|
||||||
|
return res, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (r *indexReader) Series(ref uint32) (labels.Labels, []ChunkMeta, error) {
|
||||||
|
k, n := binary.Uvarint(r.b[ref:])
|
||||||
|
if n < 1 {
|
||||||
|
return nil, nil, errors.Wrap(errInvalidSize, "number of labels")
|
||||||
|
}
|
||||||
|
|
||||||
|
b := r.b[int(ref)+n:]
|
||||||
|
lbls := make(labels.Labels, 0, k)
|
||||||
|
|
||||||
|
for i := 0; i < 2*int(k); i += 2 {
|
||||||
|
o, m := binary.Uvarint(b)
|
||||||
|
if m < 1 {
|
||||||
|
return nil, nil, errors.Wrap(errInvalidSize, "symbol offset")
|
||||||
|
}
|
||||||
|
n, err := r.lookupSymbol(uint32(o))
|
||||||
|
if err != nil {
|
||||||
|
return nil, nil, errors.Wrap(err, "symbol lookup")
|
||||||
|
}
|
||||||
|
b = b[m:]
|
||||||
|
|
||||||
|
o, m = binary.Uvarint(b)
|
||||||
|
if m < 1 {
|
||||||
|
return nil, nil, errors.Wrap(errInvalidSize, "symbol offset")
|
||||||
|
}
|
||||||
|
v, err := r.lookupSymbol(uint32(o))
|
||||||
|
if err != nil {
|
||||||
|
return nil, nil, errors.Wrap(err, "symbol lookup")
|
||||||
|
}
|
||||||
|
b = b[m:]
|
||||||
|
|
||||||
|
lbls = append(lbls, labels.Label{
|
||||||
|
Name: n,
|
||||||
|
Value: v,
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
// Read the chunks meta data.
|
||||||
|
k, n = binary.Uvarint(b)
|
||||||
|
if n < 1 {
|
||||||
|
return nil, nil, errors.Wrap(errInvalidSize, "number of chunks")
|
||||||
|
}
|
||||||
|
|
||||||
|
b = b[n:]
|
||||||
|
chunks := make([]ChunkMeta, 0, k)
|
||||||
|
|
||||||
|
for i := 0; i < int(k); i++ {
|
||||||
|
firstTime, n := binary.Varint(b)
|
||||||
|
if n < 1 {
|
||||||
|
return nil, nil, errors.Wrap(errInvalidSize, "first time")
|
||||||
|
}
|
||||||
|
b = b[n:]
|
||||||
|
|
||||||
|
lastTime, n := binary.Varint(b)
|
||||||
|
if n < 1 {
|
||||||
|
return nil, nil, errors.Wrap(errInvalidSize, "last time")
|
||||||
|
}
|
||||||
|
b = b[n:]
|
||||||
|
|
||||||
|
o, n := binary.Uvarint(b)
|
||||||
|
if n < 1 {
|
||||||
|
return nil, nil, errors.Wrap(errInvalidSize, "chunk offset")
|
||||||
|
}
|
||||||
|
b = b[n:]
|
||||||
|
|
||||||
|
chunks = append(chunks, ChunkMeta{
|
||||||
|
Ref: o,
|
||||||
|
MinTime: firstTime,
|
||||||
|
MaxTime: lastTime,
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
return lbls, chunks, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (r *indexReader) Postings(name, value string) (Postings, error) {
|
||||||
|
key := name + string(sep) + value
|
||||||
|
|
||||||
|
off, ok := r.postings[key]
|
||||||
|
if !ok {
|
||||||
|
return nil, ErrNotFound
|
||||||
|
}
|
||||||
|
|
||||||
|
flag, b, err := r.section(off)
|
||||||
|
if err != nil {
|
||||||
|
return nil, errors.Wrapf(err, "section at %d", off)
|
||||||
|
}
|
||||||
|
|
||||||
|
if flag != flagStd {
|
||||||
|
return nil, errors.Wrapf(errInvalidFlag, "section at %d", off)
|
||||||
|
}
|
||||||
|
|
||||||
|
// TODO(fabxc): just read into memory as an intermediate solution.
|
||||||
|
// Add iterator over serialized data.
|
||||||
|
var l []uint32
|
||||||
|
|
||||||
|
for len(b) > 0 {
|
||||||
|
if len(b) < 4 {
|
||||||
|
return nil, errors.Wrap(errInvalidSize, "plain postings entry")
|
||||||
|
}
|
||||||
|
l = append(l, binary.BigEndian.Uint32(b[:4]))
|
||||||
|
|
||||||
|
b = b[4:]
|
||||||
|
}
|
||||||
|
|
||||||
|
return &listPostings{list: l, idx: -1}, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
type stringTuples struct {
|
||||||
|
l int // tuple length
|
||||||
|
s []string // flattened tuple entries
|
||||||
|
}
|
||||||
|
|
||||||
|
func newStringTuples(s []string, l int) (*stringTuples, error) {
|
||||||
|
if len(s)%l != 0 {
|
||||||
|
return nil, errors.Wrap(errInvalidSize, "string tuple list")
|
||||||
|
}
|
||||||
|
return &stringTuples{s: s, l: l}, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (t *stringTuples) Len() int { return len(t.s) / t.l }
|
||||||
|
func (t *stringTuples) At(i int) ([]string, error) { return t.s[i : i+t.l], nil }
|
||||||
|
|
||||||
|
func (t *stringTuples) Swap(i, j int) {
|
||||||
|
c := make([]string, t.l)
|
||||||
|
copy(c, t.s[i:i+t.l])
|
||||||
|
|
||||||
|
for k := 0; k < t.l; k++ {
|
||||||
|
t.s[i+k] = t.s[j+k]
|
||||||
|
t.s[j+k] = c[k]
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (t *stringTuples) Less(i, j int) bool {
|
||||||
|
for k := 0; k < t.l; k++ {
|
||||||
|
d := strings.Compare(t.s[i+k], t.s[j+k])
|
||||||
|
|
||||||
|
if d < 0 {
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
if d > 0 {
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
|
||||||
|
type serializedStringTuples struct {
|
||||||
|
l int
|
||||||
|
b []byte
|
||||||
|
lookup func(uint32) (string, error)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (t *serializedStringTuples) Len() int {
|
||||||
|
// TODO(fabxc): Cache this?
|
||||||
|
return len(t.b) / (4 * t.l)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (t *serializedStringTuples) At(i int) ([]string, error) {
|
||||||
|
if len(t.b) < (i+t.l)*4 {
|
||||||
|
return nil, errInvalidSize
|
||||||
|
}
|
||||||
|
res := make([]string, 0, t.l)
|
||||||
|
|
||||||
|
for k := 0; k < t.l; k++ {
|
||||||
|
offset := binary.BigEndian.Uint32(t.b[(i+k)*4:])
|
||||||
|
|
||||||
|
s, err := t.lookup(offset)
|
||||||
|
if err != nil {
|
||||||
|
return nil, errors.Wrap(err, "symbol lookup")
|
||||||
|
}
|
||||||
|
res = append(res, s)
|
||||||
|
}
|
||||||
|
|
||||||
|
return res, nil
|
||||||
|
}
|
|
@ -40,18 +40,6 @@ func (ir *mockIndexReader) Close() error {
|
||||||
return ir.close()
|
return ir.close()
|
||||||
}
|
}
|
||||||
|
|
||||||
type mockChunkReader struct {
|
|
||||||
chunk func(ref uint64) (chunks.Chunk, error)
|
|
||||||
close func() error
|
|
||||||
}
|
|
||||||
|
|
||||||
func (cr *mockChunkReader) Chunk(ref uint64) (chunks.Chunk, error) {
|
|
||||||
return cr.chunk(ref)
|
|
||||||
}
|
|
||||||
|
|
||||||
func (cr *mockChunkReader) Close() error {
|
|
||||||
return cr.close()
|
|
||||||
}
|
|
||||||
|
|
||||||
func TestPersistence_index_e2e(t *testing.T) {
|
func TestPersistence_index_e2e(t *testing.T) {
|
||||||
dir, err := ioutil.TempDir("", "test_persistence_e2e")
|
dir, err := ioutil.TempDir("", "test_persistence_e2e")
|
||||||
|
@ -153,7 +141,3 @@ func TestPersistence_index_e2e(t *testing.T) {
|
||||||
require.NoError(t, ir.Close())
|
require.NoError(t, ir.Close())
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func BenchmarkPersistence_index_write(b *testing.B) {
|
|
||||||
|
|
||||||
}
|
|
459
reader.go
459
reader.go
|
@ -1,459 +0,0 @@
|
||||||
package tsdb
|
|
||||||
|
|
||||||
import (
|
|
||||||
"encoding/binary"
|
|
||||||
"fmt"
|
|
||||||
"io"
|
|
||||||
"path/filepath"
|
|
||||||
"strings"
|
|
||||||
|
|
||||||
"github.com/fabxc/tsdb/chunks"
|
|
||||||
"github.com/fabxc/tsdb/labels"
|
|
||||||
"github.com/pkg/errors"
|
|
||||||
)
|
|
||||||
|
|
||||||
// ChunkReader provides reading access of serialized time series data.
|
|
||||||
type ChunkReader interface {
|
|
||||||
// Chunk returns the series data chunk with the given reference.
|
|
||||||
Chunk(ref uint64) (chunks.Chunk, error)
|
|
||||||
|
|
||||||
// Close releases all underlying resources of the reader.
|
|
||||||
Close() error
|
|
||||||
}
|
|
||||||
|
|
||||||
// chunkReader implements a SeriesReader for a serialized byte stream
|
|
||||||
// of series data.
|
|
||||||
type chunkReader struct {
|
|
||||||
// The underlying bytes holding the encoded series data.
|
|
||||||
bs [][]byte
|
|
||||||
|
|
||||||
// Closers for resources behind the byte slices.
|
|
||||||
cs []io.Closer
|
|
||||||
}
|
|
||||||
|
|
||||||
// newChunkReader returns a new chunkReader based on mmaped files found in dir.
|
|
||||||
func newChunkReader(dir string) (*chunkReader, error) {
|
|
||||||
files, err := sequenceFiles(dir, "")
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
var cr chunkReader
|
|
||||||
|
|
||||||
for _, fn := range files {
|
|
||||||
f, err := openMmapFile(fn)
|
|
||||||
if err != nil {
|
|
||||||
return nil, errors.Wrapf(err, "mmap files")
|
|
||||||
}
|
|
||||||
cr.cs = append(cr.cs, f)
|
|
||||||
cr.bs = append(cr.bs, f.b)
|
|
||||||
}
|
|
||||||
|
|
||||||
for i, b := range cr.bs {
|
|
||||||
if len(b) < 4 {
|
|
||||||
return nil, errors.Wrapf(errInvalidSize, "validate magic in segment %d", i)
|
|
||||||
}
|
|
||||||
// Verify magic number.
|
|
||||||
if m := binary.BigEndian.Uint32(b[:4]); m != MagicSeries {
|
|
||||||
return nil, fmt.Errorf("invalid magic number %x", m)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return &cr, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func (s *chunkReader) Close() error {
|
|
||||||
return closeAll(s.cs...)
|
|
||||||
}
|
|
||||||
|
|
||||||
func (s *chunkReader) Chunk(ref uint64) (chunks.Chunk, error) {
|
|
||||||
var (
|
|
||||||
seq = int(ref >> 32)
|
|
||||||
off = int((ref << 32) >> 32)
|
|
||||||
)
|
|
||||||
if seq >= len(s.bs) {
|
|
||||||
return nil, errors.Errorf("reference sequence %d out of range", seq)
|
|
||||||
}
|
|
||||||
b := s.bs[seq]
|
|
||||||
|
|
||||||
if int(off) >= len(b) {
|
|
||||||
return nil, errors.Errorf("offset %d beyond data size %d", off, len(b))
|
|
||||||
}
|
|
||||||
b = b[off:]
|
|
||||||
|
|
||||||
l, n := binary.Uvarint(b)
|
|
||||||
if n < 0 {
|
|
||||||
return nil, fmt.Errorf("reading chunk length failed")
|
|
||||||
}
|
|
||||||
b = b[n:]
|
|
||||||
enc := chunks.Encoding(b[0])
|
|
||||||
|
|
||||||
c, err := chunks.FromData(enc, b[1:1+l])
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
return c, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// IndexReader provides reading access of serialized index data.
|
|
||||||
type IndexReader interface {
|
|
||||||
// LabelValues returns the possible label values
|
|
||||||
LabelValues(names ...string) (StringTuples, error)
|
|
||||||
|
|
||||||
// Postings returns the postings list iterator for the label pair.
|
|
||||||
Postings(name, value string) (Postings, error)
|
|
||||||
|
|
||||||
// Series returns the series for the given reference.
|
|
||||||
Series(ref uint32) (labels.Labels, []ChunkMeta, error)
|
|
||||||
|
|
||||||
// LabelIndices returns the label pairs for which indices exist.
|
|
||||||
LabelIndices() ([][]string, error)
|
|
||||||
|
|
||||||
// Close released the underlying resources of the reader.
|
|
||||||
Close() error
|
|
||||||
}
|
|
||||||
|
|
||||||
// StringTuples provides access to a sorted list of string tuples.
|
|
||||||
type StringTuples interface {
|
|
||||||
// Total number of tuples in the list.
|
|
||||||
Len() int
|
|
||||||
// At returns the tuple at position i.
|
|
||||||
At(i int) ([]string, error)
|
|
||||||
}
|
|
||||||
|
|
||||||
type indexReader struct {
|
|
||||||
// The underlying byte slice holding the encoded series data.
|
|
||||||
b []byte
|
|
||||||
|
|
||||||
// Close that releases the underlying resources of the byte slice.
|
|
||||||
c io.Closer
|
|
||||||
|
|
||||||
// Cached hashmaps of section offsets.
|
|
||||||
labels map[string]uint32
|
|
||||||
postings map[string]uint32
|
|
||||||
}
|
|
||||||
|
|
||||||
var (
|
|
||||||
errInvalidSize = fmt.Errorf("invalid size")
|
|
||||||
errInvalidFlag = fmt.Errorf("invalid flag")
|
|
||||||
)
|
|
||||||
|
|
||||||
// newIndexReader returns a new indexReader on the given directory.
|
|
||||||
func newIndexReader(dir string) (*indexReader, error) {
|
|
||||||
f, err := openMmapFile(filepath.Join(dir, "index"))
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
r := &indexReader{b: f.b, c: f}
|
|
||||||
|
|
||||||
// Verify magic number.
|
|
||||||
if len(f.b) < 4 {
|
|
||||||
return nil, errors.Wrap(errInvalidSize, "index header")
|
|
||||||
}
|
|
||||||
if m := binary.BigEndian.Uint32(r.b[:4]); m != MagicIndex {
|
|
||||||
return nil, errors.Errorf("invalid magic number %x", m)
|
|
||||||
}
|
|
||||||
|
|
||||||
// The last two 4 bytes hold the pointers to the hashmaps.
|
|
||||||
loff := binary.BigEndian.Uint32(r.b[len(r.b)-8 : len(r.b)-4])
|
|
||||||
poff := binary.BigEndian.Uint32(r.b[len(r.b)-4:])
|
|
||||||
|
|
||||||
flag, b, err := r.section(loff)
|
|
||||||
if err != nil {
|
|
||||||
return nil, errors.Wrapf(err, "label index hashmap section at %d", loff)
|
|
||||||
}
|
|
||||||
if r.labels, err = readHashmap(flag, b); err != nil {
|
|
||||||
return nil, errors.Wrap(err, "read label index hashmap")
|
|
||||||
}
|
|
||||||
flag, b, err = r.section(poff)
|
|
||||||
if err != nil {
|
|
||||||
return nil, errors.Wrapf(err, "postings hashmap section at %d", loff)
|
|
||||||
}
|
|
||||||
if r.postings, err = readHashmap(flag, b); err != nil {
|
|
||||||
return nil, errors.Wrap(err, "read postings hashmap")
|
|
||||||
}
|
|
||||||
|
|
||||||
return r, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func readHashmap(flag byte, b []byte) (map[string]uint32, error) {
|
|
||||||
if flag != flagStd {
|
|
||||||
return nil, errInvalidFlag
|
|
||||||
}
|
|
||||||
h := make(map[string]uint32, 512)
|
|
||||||
|
|
||||||
for len(b) > 0 {
|
|
||||||
l, n := binary.Uvarint(b)
|
|
||||||
if n < 1 {
|
|
||||||
return nil, errors.Wrap(errInvalidSize, "read key length")
|
|
||||||
}
|
|
||||||
b = b[n:]
|
|
||||||
|
|
||||||
if len(b) < int(l) {
|
|
||||||
return nil, errors.Wrap(errInvalidSize, "read key")
|
|
||||||
}
|
|
||||||
s := string(b[:l])
|
|
||||||
b = b[l:]
|
|
||||||
|
|
||||||
o, n := binary.Uvarint(b)
|
|
||||||
if n < 1 {
|
|
||||||
return nil, errors.Wrap(errInvalidSize, "read offset value")
|
|
||||||
}
|
|
||||||
b = b[n:]
|
|
||||||
|
|
||||||
h[s] = uint32(o)
|
|
||||||
}
|
|
||||||
|
|
||||||
return h, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func (r *indexReader) Close() error {
|
|
||||||
return r.c.Close()
|
|
||||||
}
|
|
||||||
|
|
||||||
func (r *indexReader) section(o uint32) (byte, []byte, error) {
|
|
||||||
b := r.b[o:]
|
|
||||||
|
|
||||||
if len(b) < 5 {
|
|
||||||
return 0, nil, errors.Wrap(errInvalidSize, "read header")
|
|
||||||
}
|
|
||||||
|
|
||||||
flag := b[0]
|
|
||||||
l := binary.BigEndian.Uint32(b[1:5])
|
|
||||||
|
|
||||||
b = b[5:]
|
|
||||||
|
|
||||||
// b must have the given length plus 4 bytes for the CRC32 checksum.
|
|
||||||
if len(b) < int(l)+4 {
|
|
||||||
return 0, nil, errors.Wrap(errInvalidSize, "section content")
|
|
||||||
}
|
|
||||||
return flag, b[:l], nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func (r *indexReader) lookupSymbol(o uint32) (string, error) {
|
|
||||||
if int(o) > len(r.b) {
|
|
||||||
return "", errors.Errorf("invalid symbol offset %d", o)
|
|
||||||
}
|
|
||||||
l, n := binary.Uvarint(r.b[o:])
|
|
||||||
if n < 0 {
|
|
||||||
return "", errors.New("reading symbol length failed")
|
|
||||||
}
|
|
||||||
|
|
||||||
end := int(o) + n + int(l)
|
|
||||||
if end > len(r.b) {
|
|
||||||
return "", errors.New("invalid length")
|
|
||||||
}
|
|
||||||
b := r.b[int(o)+n : end]
|
|
||||||
|
|
||||||
return yoloString(b), nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func (r *indexReader) LabelValues(names ...string) (StringTuples, error) {
|
|
||||||
key := strings.Join(names, string(sep))
|
|
||||||
off, ok := r.labels[key]
|
|
||||||
if !ok {
|
|
||||||
return nil, fmt.Errorf("label index doesn't exist")
|
|
||||||
}
|
|
||||||
|
|
||||||
flag, b, err := r.section(off)
|
|
||||||
if err != nil {
|
|
||||||
return nil, errors.Wrapf(err, "section at %d", off)
|
|
||||||
}
|
|
||||||
if flag != flagStd {
|
|
||||||
return nil, errInvalidFlag
|
|
||||||
}
|
|
||||||
l, n := binary.Uvarint(b)
|
|
||||||
if n < 1 {
|
|
||||||
return nil, errors.Wrap(errInvalidSize, "read label index size")
|
|
||||||
}
|
|
||||||
|
|
||||||
st := &serializedStringTuples{
|
|
||||||
l: int(l),
|
|
||||||
b: b[n:],
|
|
||||||
lookup: r.lookupSymbol,
|
|
||||||
}
|
|
||||||
return st, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func (r *indexReader) LabelIndices() ([][]string, error) {
|
|
||||||
res := [][]string{}
|
|
||||||
|
|
||||||
for s := range r.labels {
|
|
||||||
res = append(res, strings.Split(s, string(sep)))
|
|
||||||
}
|
|
||||||
return res, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func (r *indexReader) Series(ref uint32) (labels.Labels, []ChunkMeta, error) {
|
|
||||||
k, n := binary.Uvarint(r.b[ref:])
|
|
||||||
if n < 1 {
|
|
||||||
return nil, nil, errors.Wrap(errInvalidSize, "number of labels")
|
|
||||||
}
|
|
||||||
|
|
||||||
b := r.b[int(ref)+n:]
|
|
||||||
lbls := make(labels.Labels, 0, k)
|
|
||||||
|
|
||||||
for i := 0; i < 2*int(k); i += 2 {
|
|
||||||
o, m := binary.Uvarint(b)
|
|
||||||
if m < 1 {
|
|
||||||
return nil, nil, errors.Wrap(errInvalidSize, "symbol offset")
|
|
||||||
}
|
|
||||||
n, err := r.lookupSymbol(uint32(o))
|
|
||||||
if err != nil {
|
|
||||||
return nil, nil, errors.Wrap(err, "symbol lookup")
|
|
||||||
}
|
|
||||||
b = b[m:]
|
|
||||||
|
|
||||||
o, m = binary.Uvarint(b)
|
|
||||||
if m < 1 {
|
|
||||||
return nil, nil, errors.Wrap(errInvalidSize, "symbol offset")
|
|
||||||
}
|
|
||||||
v, err := r.lookupSymbol(uint32(o))
|
|
||||||
if err != nil {
|
|
||||||
return nil, nil, errors.Wrap(err, "symbol lookup")
|
|
||||||
}
|
|
||||||
b = b[m:]
|
|
||||||
|
|
||||||
lbls = append(lbls, labels.Label{
|
|
||||||
Name: n,
|
|
||||||
Value: v,
|
|
||||||
})
|
|
||||||
}
|
|
||||||
|
|
||||||
// Read the chunks meta data.
|
|
||||||
k, n = binary.Uvarint(b)
|
|
||||||
if n < 1 {
|
|
||||||
return nil, nil, errors.Wrap(errInvalidSize, "number of chunks")
|
|
||||||
}
|
|
||||||
|
|
||||||
b = b[n:]
|
|
||||||
chunks := make([]ChunkMeta, 0, k)
|
|
||||||
|
|
||||||
for i := 0; i < int(k); i++ {
|
|
||||||
firstTime, n := binary.Varint(b)
|
|
||||||
if n < 1 {
|
|
||||||
return nil, nil, errors.Wrap(errInvalidSize, "first time")
|
|
||||||
}
|
|
||||||
b = b[n:]
|
|
||||||
|
|
||||||
lastTime, n := binary.Varint(b)
|
|
||||||
if n < 1 {
|
|
||||||
return nil, nil, errors.Wrap(errInvalidSize, "last time")
|
|
||||||
}
|
|
||||||
b = b[n:]
|
|
||||||
|
|
||||||
o, n := binary.Uvarint(b)
|
|
||||||
if n < 1 {
|
|
||||||
return nil, nil, errors.Wrap(errInvalidSize, "chunk offset")
|
|
||||||
}
|
|
||||||
b = b[n:]
|
|
||||||
|
|
||||||
chunks = append(chunks, ChunkMeta{
|
|
||||||
Ref: o,
|
|
||||||
MinTime: firstTime,
|
|
||||||
MaxTime: lastTime,
|
|
||||||
})
|
|
||||||
}
|
|
||||||
|
|
||||||
return lbls, chunks, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func (r *indexReader) Postings(name, value string) (Postings, error) {
|
|
||||||
key := name + string(sep) + value
|
|
||||||
|
|
||||||
off, ok := r.postings[key]
|
|
||||||
if !ok {
|
|
||||||
return nil, ErrNotFound
|
|
||||||
}
|
|
||||||
|
|
||||||
flag, b, err := r.section(off)
|
|
||||||
if err != nil {
|
|
||||||
return nil, errors.Wrapf(err, "section at %d", off)
|
|
||||||
}
|
|
||||||
|
|
||||||
if flag != flagStd {
|
|
||||||
return nil, errors.Wrapf(errInvalidFlag, "section at %d", off)
|
|
||||||
}
|
|
||||||
|
|
||||||
// TODO(fabxc): just read into memory as an intermediate solution.
|
|
||||||
// Add iterator over serialized data.
|
|
||||||
var l []uint32
|
|
||||||
|
|
||||||
for len(b) > 0 {
|
|
||||||
if len(b) < 4 {
|
|
||||||
return nil, errors.Wrap(errInvalidSize, "plain postings entry")
|
|
||||||
}
|
|
||||||
l = append(l, binary.BigEndian.Uint32(b[:4]))
|
|
||||||
|
|
||||||
b = b[4:]
|
|
||||||
}
|
|
||||||
|
|
||||||
return &listPostings{list: l, idx: -1}, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
type stringTuples struct {
|
|
||||||
l int // tuple length
|
|
||||||
s []string // flattened tuple entries
|
|
||||||
}
|
|
||||||
|
|
||||||
func newStringTuples(s []string, l int) (*stringTuples, error) {
|
|
||||||
if len(s)%l != 0 {
|
|
||||||
return nil, errors.Wrap(errInvalidSize, "string tuple list")
|
|
||||||
}
|
|
||||||
return &stringTuples{s: s, l: l}, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func (t *stringTuples) Len() int { return len(t.s) / t.l }
|
|
||||||
func (t *stringTuples) At(i int) ([]string, error) { return t.s[i : i+t.l], nil }
|
|
||||||
|
|
||||||
func (t *stringTuples) Swap(i, j int) {
|
|
||||||
c := make([]string, t.l)
|
|
||||||
copy(c, t.s[i:i+t.l])
|
|
||||||
|
|
||||||
for k := 0; k < t.l; k++ {
|
|
||||||
t.s[i+k] = t.s[j+k]
|
|
||||||
t.s[j+k] = c[k]
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func (t *stringTuples) Less(i, j int) bool {
|
|
||||||
for k := 0; k < t.l; k++ {
|
|
||||||
d := strings.Compare(t.s[i+k], t.s[j+k])
|
|
||||||
|
|
||||||
if d < 0 {
|
|
||||||
return true
|
|
||||||
}
|
|
||||||
if d > 0 {
|
|
||||||
return false
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return false
|
|
||||||
}
|
|
||||||
|
|
||||||
type serializedStringTuples struct {
|
|
||||||
l int
|
|
||||||
b []byte
|
|
||||||
lookup func(uint32) (string, error)
|
|
||||||
}
|
|
||||||
|
|
||||||
func (t *serializedStringTuples) Len() int {
|
|
||||||
// TODO(fabxc): Cache this?
|
|
||||||
return len(t.b) / (4 * t.l)
|
|
||||||
}
|
|
||||||
|
|
||||||
func (t *serializedStringTuples) At(i int) ([]string, error) {
|
|
||||||
if len(t.b) < (i+t.l)*4 {
|
|
||||||
return nil, errInvalidSize
|
|
||||||
}
|
|
||||||
res := make([]string, 0, t.l)
|
|
||||||
|
|
||||||
for k := 0; k < t.l; k++ {
|
|
||||||
offset := binary.BigEndian.Uint32(t.b[(i+k)*4:])
|
|
||||||
|
|
||||||
s, err := t.lookup(offset)
|
|
||||||
if err != nil {
|
|
||||||
return nil, errors.Wrap(err, "symbol lookup")
|
|
||||||
}
|
|
||||||
res = append(res, s)
|
|
||||||
}
|
|
||||||
|
|
||||||
return res, nil
|
|
||||||
}
|
|
Loading…
Reference in New Issue