diff --git a/cmd/tsdb/main.go b/cmd/tsdb/main.go index 01b843653..b5e0984c1 100644 --- a/cmd/tsdb/main.go +++ b/cmd/tsdb/main.go @@ -134,7 +134,7 @@ func (b *writeBenchmark) run(cmd *cobra.Command, args []string) { measureTime("ingestScrapes", func() { b.startProfiling() - if err := b.ingestScrapes(metrics, 1000); err != nil { + if err := b.ingestScrapes(metrics, 3000); err != nil { exitWithError(err) } }) diff --git a/db.go b/db.go index c27e491ba..54e4632bd 100644 --- a/db.go +++ b/db.go @@ -282,8 +282,8 @@ func (s *SeriesShard) persist() error { defer sw.Close() defer iw.Close() - for _, cd := range head.index.forward { - if err := sw.WriteSeries(cd.lset, []*chunkDesc{cd}); err != nil { + for ref, cd := range head.index.forward { + if err := sw.WriteSeries(ref, cd.lset, []*chunkDesc{cd}); err != nil { return err } } @@ -297,8 +297,15 @@ func (s *SeriesShard) persist() error { s = append(s, x) } - iw.WriteLabelIndex([]string{n}, s) + if err := iw.WriteLabelIndex([]string{n}, s); err != nil { + return err + } + } + for t := range head.index.postings.m { + if err := iw.WritePostings(t.name, t.value, head.index.postings.get(t)); err != nil { + return err + } } sz := fmt.Sprintf("%fMiB", float64(sw.Size()+iw.Size())/1024/1024) diff --git a/index.go b/index.go index f236410a5..7faf45af6 100644 --- a/index.go +++ b/index.go @@ -24,7 +24,7 @@ func newMemIndex() *memIndex { lastID: 0, forward: make(map[uint32]*chunkDesc), values: make(map[string]stringset), - postings: &memPostings{m: make(map[string][]uint32)}, + postings: &memPostings{m: make(map[term][]uint32)}, } } @@ -32,22 +32,20 @@ func (ix *memIndex) numSeries() int { return len(ix.forward) } -func (ix *memIndex) Postings(s string) Iterator { - return ix.postings.get(s) +func (ix *memIndex) Postings(t term) Iterator { + return ix.postings.get(t) +} + +type term struct { + name, value string } func (ix *memIndex) add(chkd *chunkDesc) { // Add each label pair as a term to the inverted index. - terms := make([]string, 0, len(chkd.lset)) - b := make([]byte, 0, 64) + terms := make([]term, 0, len(chkd.lset)) for _, l := range chkd.lset { - b = append(b, l.Name...) - b = append(b, sep) - b = append(b, l.Value...) - - terms = append(terms, string(b)) - b = b[:0] + terms = append(terms, term{name: l.Name, value: l.Value}) // Add to label name to values index. valset, ok := ix.values[l.Name] @@ -67,17 +65,17 @@ func (ix *memIndex) add(chkd *chunkDesc) { } type memPostings struct { - m map[string][]uint32 + m map[term][]uint32 } // Postings returns an iterator over the postings list for s. -func (p *memPostings) get(s string) Iterator { - return &listIterator{list: p.m[s], idx: -1} +func (p *memPostings) get(t term) Iterator { + return &listIterator{list: p.m[t], idx: -1} } // add adds a document to the index. The caller has to ensure that no // term argument appears twice. -func (p *memPostings) add(id uint32, terms ...string) { +func (p *memPostings) add(id uint32, terms ...term) { for _, t := range terms { p.m[t] = append(p.m[t], id) } diff --git a/writer.go b/writer.go index 252cd84b3..d1a773f28 100644 --- a/writer.go +++ b/writer.go @@ -21,7 +21,9 @@ const ( // SeriesWriter serializes a time block of chunked series data. type SeriesWriter interface { // WriteSeries writes the time series data chunks for a single series. - WriteSeries(Labels, []*chunkDesc) error + // The reference is used to resolve the correct series in the written index. + // It only has to be valid for the duration of the write. + WriteSeries(ref uint32, l Labels, cds []*chunkDesc) error // Size returns the size of the data written so far. Size() int64 @@ -64,7 +66,7 @@ func (w *seriesWriter) writeMeta() error { return w.write(w.w, metab) } -func (w *seriesWriter) WriteSeries(lset Labels, chks []*chunkDesc) error { +func (w *seriesWriter) WriteSeries(ref uint32, lset Labels, chks []*chunkDesc) error { // Initialize with meta data. if w.n == 0 { if err := w.writeMeta(); err != nil { @@ -109,7 +111,7 @@ func (w *seriesWriter) WriteSeries(lset Labels, chks []*chunkDesc) error { } if w.index != nil { - w.index.AddOffsets(lset, offsets...) + w.index.AddSeries(ref, lset, offsets...) } return nil } @@ -142,9 +144,11 @@ type BlockStats struct { // IndexWriter serialized the index for a block of series data. // The methods must generally be called in order they are specified. type IndexWriter interface { - // AddOffsets populates the index writer with offsets of chunks - // for a series that the index can reference. - AddOffsets(Labels, ...ChunkOffset) + // AddSeries populates the index writer witha series and its offsets + // of chunks that the index can reference. + // The reference number is used to resolve a series against the postings + // list iterator. It only has to be available during the write processing. + AddSeries(ref uint32, l Labels, o ...ChunkOffset) // WriteStats writes final stats for the indexed block. WriteStats(*BlockStats) error @@ -164,18 +168,23 @@ type IndexWriter interface { Close() error } +type indexWriterSeries struct { + labels Labels + chunks []ChunkOffset // series file offset of chunks + offset uint32 // index file offset of series reference +} + // indexWriter implements the IndexWriter interface for the standard // serialization format. type indexWriter struct { w io.Writer n int64 - series []Labels // series in data section - offsets [][]ChunkOffset // chunk offsets per series - seriesOffsets []uint32 // offset of series references + series map[uint32]*indexWriterSeries symbols map[string]uint32 // symbol offsets labelIndexes []hashEntry // label index offsets + postings []hashEntry // postings lists offsets } func newIndexWriter(w io.Writer) *indexWriter { @@ -183,6 +192,7 @@ func newIndexWriter(w io.Writer) *indexWriter { w: w, n: 0, symbols: make(map[string]uint32, 4096), + series: make(map[uint32]*indexWriterSeries, 4096), labelIndexes: make([]hashEntry, 10), } } @@ -218,15 +228,17 @@ func (w *indexWriter) writeMeta() error { return w.write(w.w, metab) } -func (w *indexWriter) AddOffsets(lset Labels, offsets ...ChunkOffset) { - w.series = append(w.series, lset) - w.offsets = append(w.offsets, offsets) - +func (w *indexWriter) AddSeries(ref uint32, lset Labels, offsets ...ChunkOffset) { // Populate the symbol table from all label sets we have to reference. for _, l := range lset { w.symbols[l.Name] = 0 w.symbols[l.Value] = 0 } + + w.series[ref] = &indexWriterSeries{ + labels: lset, + chunks: offsets, + } } func (w *indexWriter) WriteStats(*BlockStats) error { @@ -274,11 +286,13 @@ func (w *indexWriter) writeSeries() error { b := make([]byte, 0, 4096) buf := [binary.MaxVarintLen32]byte{} - for _, lset := range w.series { - for _, l := range lset { - n := binary.PutUvarint(buf[:], uint64(len(lset))) - b = append(b, buf[:n]...) + for _, s := range w.series { + s.offset = uint32(w.n) + uint32(len(b)) + + n := binary.PutUvarint(buf[:], uint64(len(s.labels))) + b = append(b, buf[:n]...) + for _, l := range s.labels { n = binary.PutUvarint(buf[:], uint64(w.symbols[l.Name])) b = append(b, buf[:n]...) n = binary.PutUvarint(buf[:], uint64(w.symbols[l.Value])) @@ -320,7 +334,23 @@ func (w *indexWriter) WriteLabelIndex(names []string, values []string) error { } func (w *indexWriter) WritePostings(name, value string, it Iterator) error { - return nil + key := name + string(sep) + value + + w.postings = append(w.postings, hashEntry{ + name: key, + offset: uint32(w.n), + }) + + b := make([]byte, 0, 4096) + + for it.Next() { + v := w.series[it.Value()].offset + b = append(b, ((*[4]byte)(unsafe.Pointer(&v)))[:]...) + } + + return w.section(uint32(len(b)), flagStd, func(wr io.Writer) error { + return w.write(wr, b) + }) } func (w *indexWriter) Size() int64 { @@ -337,7 +367,7 @@ const hashEntrySize = uint32(unsafe.Sizeof(hashEntry{})) func (w *indexWriter) finalize() error { l := 1 + uint32(len(w.labelIndexes))*hashEntrySize - return w.section(l, flagStd, func(wr io.Writer) error { + err := w.section(l, flagStd, func(wr io.Writer) error { for _, e := range w.labelIndexes { b := ((*[hashEntrySize]byte)(unsafe.Pointer(&e)))[:] @@ -347,6 +377,24 @@ func (w *indexWriter) finalize() error { } return nil }) + + if err != nil { + return err + } + + err = w.section(l, flagStd, func(wr io.Writer) error { + for _, e := range w.postings { + b := ((*[hashEntrySize]byte)(unsafe.Pointer(&e)))[:] + + if err := w.write(w.w, b); err != nil { + return nil + } + } + return nil + }) + // TODO(fabxc): write hashmap offsets. + + return err } func (w *indexWriter) Close() error {