From 1e0edf367b872a8be511b884f493fb035f71bc9d Mon Sep 17 00:00:00 2001 From: Fabian Reinartz Date: Fri, 9 Dec 2016 21:23:34 +0100 Subject: [PATCH] Write index with symbol table --- db.go | 23 ++++++++++++---- writer.go | 79 ++++++++++++++++++++++++++++++++++++++++++++++++------- 2 files changed, 87 insertions(+), 15 deletions(-) diff --git a/db.go b/db.go index e65ac877b..30f16ffb5 100644 --- a/db.go +++ b/db.go @@ -267,19 +267,32 @@ func (s *SeriesShard) persist() error { return err } - f, err := os.Create(filepath.Join(p, "series")) + sf, err := os.Create(filepath.Join(p, "series")) + if err != nil { + return err + } + xf, err := os.Create(filepath.Join(p, "index")) if err != nil { return err } - w := newSeriesWriter(f, s.head.baseTimestamp) - defer w.Close() + iw := newIndexWriter(xf) + sw := newSeriesWriter(sf, iw, s.head.baseTimestamp) + + defer sw.Close() + defer iw.Close() for _, cd := range head.index.forward { - w.WriteSeries(cd.lset, []*chunkDesc{cd}) + if err := sw.WriteSeries(cd.lset, []*chunkDesc{cd}); err != nil { + return err + } } - sz := fmt.Sprintf("%fMiB", float64(w.Size())/1024/1024) + if err := iw.WriteStats(nil); err != nil { + return err + } + + sz := fmt.Sprintf("%fMiB", float64(sw.Size()+iw.Size())/1024/1024) s.logger.With("size", sz). With("samples", head.samples). diff --git a/writer.go b/writer.go index e78a1ab7e..eebb45801 100644 --- a/writer.go +++ b/writer.go @@ -1,9 +1,11 @@ package tsdb import ( + "encoding/binary" "hash/crc32" "io" "os" + "sort" "unsafe" ) @@ -37,15 +39,13 @@ type seriesWriter struct { baseTimestamp int64 index IndexWriter - - chunkOffsets map[uint32][]uint32 - seriesOffsets map[uint32]uint32 } -func newSeriesWriter(w io.Writer, base int64) *seriesWriter { +func newSeriesWriter(w io.Writer, index IndexWriter, base int64) *seriesWriter { return &seriesWriter{ w: w, n: 0, + index: index, baseTimestamp: base, } } @@ -148,9 +148,6 @@ type IndexWriter interface { // WriteStats writes final stats for the indexed block. WriteStats(*BlockStats) error - // WriteSymbols serializes all encountered string symbols. - WriteSymbols([]string) error - // WriteLabelIndex serializes an index from label names to values. // The passed in values chained tuples of strings of the length of names. WriteLabelIndex(names []string, values []string) error @@ -164,7 +161,7 @@ type IndexWriter interface { // Size returns the size of the data written so far. Size() int64 - // Closes writes any finalization and closes theresources associated with + // Close writes any finalization and closes theresources associated with // the underlying writer. Close() error } @@ -177,19 +174,80 @@ type indexWriter struct { series []Labels offsets [][]ChunkOffset + + symbols map[string]uint32 +} + +func newIndexWriter(w io.Writer) *indexWriter { + return &indexWriter{ + w: w, + n: 0, + symbols: make(map[string]uint32), + } +} + +func (w *indexWriter) write(wr io.Writer, b []byte) error { + n, err := wr.Write(b) + w.n += int64(n) + return err +} + +func (w *indexWriter) writeMeta() error { + meta := &meta{magic: MagicSeries, flag: flagStd} + metab := ((*[metaSize]byte)(unsafe.Pointer(meta)))[:] + + return w.write(w.w, metab) } func (w *indexWriter) AddOffsets(lset Labels, offsets ...ChunkOffset) { w.series = append(w.series, lset) w.offsets = append(w.offsets, offsets) + + // Populate the symbol table from all label sets we have to reference. + for _, l := range lset { + w.symbols[l.Name] = 0 + w.symbols[l.Value] = 0 + } } func (w *indexWriter) WriteStats(*BlockStats) error { + if w.n == 0 { + if err := w.writeMeta(); err != nil { + return err + } + if err := w.writeSymbols(); err != nil { + return err + } + } return nil } -func (w *indexWriter) WriteSymbols(symbols []string) error { - return nil +func (w *indexWriter) writeSymbols() error { + // Generate sorted list of strings we will store as reference table. + symbols := make([]string, 0, len(w.symbols)) + for s := range w.symbols { + symbols = append(symbols, s) + } + sort.Strings(symbols) + + h := crc32.NewIEEE() + wr := io.MultiWriter(h, w.w) + + buf := make([]byte, binary.MaxVarintLen32) + + for _, s := range symbols { + n := binary.PutUvarint(buf, uint64(len(s))) + w.symbols[s] = uint32(w.n) + + if err := w.write(wr, buf[:n]); err != nil { + return err + } + if err := w.write(wr, []byte(s)); err != nil { + return err + } + } + + return w.write(w.w, h.Sum(nil)) } func (w *indexWriter) WriteLabelIndex(names []string, values []string) error { @@ -207,6 +265,7 @@ func (w *indexWriter) WritePostings(name, value string, it Iterator) error { func (w *indexWriter) Size() int64 { return w.n } + func (w *indexWriter) Close() error { if f, ok := w.w.(*os.File); ok { if err := f.Sync(); err != nil {