From b334c3ade8a3819097d6e6956dc5854109c7facf Mon Sep 17 00:00:00 2001 From: Fabian Reinartz Date: Mon, 12 Dec 2016 15:39:55 +0100 Subject: [PATCH] Write chunk skiplist and add series reader --- querier.go | 4 +-- reader.go | 100 +++++++++++++++++++++++++++++++++++++++++++++++++++-- writer.go | 29 ++++++++++++---- 3 files changed, 121 insertions(+), 12 deletions(-) diff --git a/querier.go b/querier.go index dd5c160fe..d64f6129a 100644 --- a/querier.go +++ b/querier.go @@ -34,9 +34,9 @@ type Querier interface { // Series represents a single time series. type Series interface { - Labels() Labels + Labels() (Labels, error) // Iterator returns a new iterator of the data of the series. - Iterator() SeriesIterator + Iterator() (SeriesIterator, error) } // SeriesIterator iterates over the data of a time series. diff --git a/reader.go b/reader.go index fa0dca9b5..c4c5dd721 100644 --- a/reader.go +++ b/reader.go @@ -188,18 +188,112 @@ func (r *indexReader) LabelValues(names ...string) (StringTuples, error) { if flag != flagStd { return nil, errInvalidFlag } - if len(b) < 1 { + l, n := binary.Uvarint(b) + if n < 1 { return nil, errInvalidSize } st := &serializedStringTuples{ - l: int(b[0]), - b: b[1:], + l: int(l), + b: b[n:], lookup: r.lookupSymbol, } return st, nil } +func (r *indexReader) Series(ref uint32) (Series, error) { + k, n := binary.Uvarint(r.b[ref:]) + if n < 1 { + return nil, errInvalidSize + } + + b := r.b[int(ref)+n:] + offsets := make([]uint32, 0, k) + + for i := 0; i < int(k); i++ { + o, n := binary.Uvarint(b) + if n < 1 { + return nil, errInvalidSize + } + offsets = append(offsets, uint32(o)) + + b = b[n:] + } + // Offests must occur in pairs representing name and value. + if len(offsets)&1 != 0 { + return nil, errInvalidSize + } + + // TODO(fabxc): Fully materialize series for now. Figure out later if it + // makes sense to decode those lazily. + // The references are expected to be sorted and match the order of + // the underlying strings. + labels := make(Labels, 0, k) + + for i := 0; i < int(k); i += 2 { + n, err := r.lookupSymbol(offsets[i]) + if err != nil { + return nil, err + } + v, err := r.lookupSymbol(offsets[i+1]) + if err != nil { + return nil, err + } + labels = append(labels, Label{ + Name: string(n), + Value: string(v), + }) + } + + // Read the chunk offsets. + k, n = binary.Uvarint(r.b[ref:]) + if n < 1 { + return nil, errInvalidSize + } + + b = b[n:] + coffsets := make([]ChunkOffset, 0, k) + + for i := 0; i < int(k); i++ { + v, n := binary.Varint(b) + if n < 1 { + return nil, errInvalidSize + } + b = b[n:] + + o, n := binary.Uvarint(b) + if n < 1 { + return nil, errInvalidSize + } + b = b[n:] + + coffsets = append(coffsets, ChunkOffset{ + Offset: uint32(o), + Value: v, + }) + } + + s := &series{ + labels: labels, + offsets: coffsets, + } + return s, nil +} + +type series struct { + labels Labels + offsets []ChunkOffset +} + +func (s *series) Labels() (Labels, error) { + return s.labels, nil +} + +func (s *series) Iterator() (SeriesIterator, error) { + // dereference skiplist and construct from chunk iterators. + return nil, nil +} + type stringTuples struct { l int // tuple length s []string // flattened tuple entries diff --git a/writer.go b/writer.go index 470746c4c..6bba27a50 100644 --- a/writer.go +++ b/writer.go @@ -295,18 +295,31 @@ func (w *indexWriter) writeSymbols() error { func (w *indexWriter) writeSeries() error { b := make([]byte, 0, 4096) - buf := [binary.MaxVarintLen32]byte{} + buf := make([]byte, binary.MaxVarintLen64) for _, s := range w.series { + // Write label set symbol references. s.offset = uint32(w.n) + uint32(len(b)) - n := binary.PutUvarint(buf[:], uint64(len(s.labels))) + n := binary.PutUvarint(buf, uint64(len(s.labels))) b = append(b, buf[:n]...) for _, l := range s.labels { - n = binary.PutUvarint(buf[:], uint64(w.symbols[l.Name])) + n = binary.PutUvarint(buf, uint64(w.symbols[l.Name])) b = append(b, buf[:n]...) - n = binary.PutUvarint(buf[:], uint64(w.symbols[l.Value])) + n = binary.PutUvarint(buf, uint64(w.symbols[l.Value])) + b = append(b, buf[:n]...) + } + + // Write skiplist to chunk offsets. + n = binary.PutUvarint(buf, uint64(len(s.chunks))) + b = append(b, buf[:n]...) + + for _, c := range s.chunks { + n = binary.PutVarint(buf, c.Value) + b = append(b, buf[:n]...) + + n = binary.PutUvarint(buf, uint64(c.Offset)) b = append(b, buf[:n]...) } } @@ -330,14 +343,16 @@ func (w *indexWriter) WriteLabelIndex(names []string, values []string) error { offset: uint32(w.n), }) - l := 1 + uint32(len(values)*4) + buf := make([]byte, binary.MaxVarintLen32) + n := binary.PutUvarint(buf, uint64(len(names))) + + l := uint32(n) + uint32(len(values)*4) return w.section(l, flagStd, func(wr io.Writer) error { // First byte indicates tuple size for index. - if err := w.write(wr, []byte{byte(len(names))}); err != nil { + if err := w.write(wr, buf[:n]); err != nil { return err } - buf := make([]byte, 4) for _, v := range valt.s { binary.BigEndian.PutUint32(buf, w.symbols[v])