diff --git a/db.go b/db.go index 064dd2e2a..eb640b103 100644 --- a/db.go +++ b/db.go @@ -264,7 +264,7 @@ func (db *DB) compact() error { db.logger.Log("msg", "compact blocks", "seq", fmt.Sprintf("%v", p)) if err := db.compactor.Compact(p...); err != nil { - return errors.Wrapf(err, "compact", p) + return errors.Wrapf(err, "compact %s", p) } changes = true } diff --git a/writer.go b/writer.go index bafb0e8cd..264052d32 100644 --- a/writer.go +++ b/writer.go @@ -11,7 +11,6 @@ import ( "sort" "strings" - "github.com/bradfitz/slice" "github.com/coreos/etcd/pkg/fileutil" "github.com/fabxc/tsdb/chunks" "github.com/fabxc/tsdb/labels" @@ -262,6 +261,10 @@ type indexWriter struct { n int64 started bool + // Reusable memory. + b []byte + uint32s []uint32 + series map[uint32]*indexWriterSeries symbols map[string]uint32 // symbol offsets labelIndexes []hashEntry // label index offsets @@ -284,11 +287,17 @@ func newIndexWriter(dir string) (*indexWriter, error) { } iw := &indexWriter{ - f: f, - bufw: bufio.NewWriterSize(f, 1*1024*1024), - n: 0, - symbols: make(map[string]uint32, 4096), - series: make(map[uint32]*indexWriterSeries, 4096), + f: f, + bufw: bufio.NewWriterSize(f, 1<<22), + n: 0, + + // Reusable memory. + b: make([]byte, 0, 1<<23), + uint32s: make([]uint32, 0, 1<<15), + + // Caches. + symbols: make(map[string]uint32, 1<<13), + series: make(map[uint32]*indexWriterSeries, 1<<16), crc32: crc32.New(crc32.MakeTable(crc32.Castagnoli)), } if err := iw.writeMeta(); err != nil { @@ -304,12 +313,12 @@ func (w *indexWriter) write(wr io.Writer, b []byte) error { } // section writes a CRC32 checksummed section of length l and guarded by flag. -func (w *indexWriter) section(l uint32, flag byte, f func(w io.Writer) error) error { +func (w *indexWriter) section(l int, flag byte, f func(w io.Writer) error) error { w.crc32.Reset() wr := io.MultiWriter(w.crc32, w.bufw) b := [5]byte{flag, 0, 0, 0, 0} - binary.BigEndian.PutUint32(b[1:], l) + binary.BigEndian.PutUint32(b[1:], uint32(l)) if err := w.write(wr, b[:]); err != nil { return errors.Wrap(err, "writing header") @@ -363,74 +372,77 @@ func (w *indexWriter) writeSymbols() error { base := uint32(w.n) + 5 buf := [binary.MaxVarintLen32]byte{} - b := append(make([]byte, 0, 4096), flagStd) + w.b = append(w.b[:0], flagStd) for _, s := range symbols { - w.symbols[s] = base + uint32(len(b)) + w.symbols[s] = base + uint32(len(w.b)) n := binary.PutUvarint(buf[:], uint64(len(s))) - b = append(b, buf[:n]...) - b = append(b, s...) + w.b = append(w.b, buf[:n]...) + w.b = append(w.b, s...) } - l := uint32(len(b)) - - return w.section(l, flagStd, func(wr io.Writer) error { - return w.write(wr, b) + return w.section(len(w.b), flagStd, func(wr io.Writer) error { + return w.write(wr, w.b) }) } +type indexWriterSeriesSlice []*indexWriterSeries + +func (s indexWriterSeriesSlice) Len() int { return len(s) } +func (s indexWriterSeriesSlice) Swap(i, j int) { s[i], s[j] = s[j], s[i] } + +func (s indexWriterSeriesSlice) Less(i, j int) bool { + return labels.Compare(s[i].labels, s[j].labels) < 0 +} + func (w *indexWriter) writeSeries() error { // Series must be stored sorted along their labels. - series := make([]*indexWriterSeries, 0, len(w.series)) + series := make(indexWriterSeriesSlice, 0, len(w.series)) for _, s := range w.series { series = append(series, s) } - slice.Sort(series, func(i, j int) bool { - return labels.Compare(series[i].labels, series[j].labels) < 0 - }) + sort.Sort(series) // Current end of file plus 5 bytes for section header. // TODO(fabxc): switch to relative offsets. base := uint32(w.n) + 5 - b := make([]byte, 0, 1<<20) // 1MiB + w.b = w.b[:0] buf := make([]byte, binary.MaxVarintLen64) for _, s := range series { // Write label set symbol references. - s.offset = base + uint32(len(b)) + s.offset = base + uint32(len(w.b)) n := binary.PutUvarint(buf, uint64(len(s.labels))) - b = append(b, buf[:n]...) + w.b = append(w.b, buf[:n]...) for _, l := range s.labels { n = binary.PutUvarint(buf, uint64(w.symbols[l.Name])) - b = append(b, buf[:n]...) + w.b = append(w.b, buf[:n]...) n = binary.PutUvarint(buf, uint64(w.symbols[l.Value])) - b = append(b, buf[:n]...) + w.b = append(w.b, buf[:n]...) } // Write chunks meta data including reference into chunk file. n = binary.PutUvarint(buf, uint64(len(s.chunks))) - b = append(b, buf[:n]...) + w.b = append(w.b, buf[:n]...) for _, c := range s.chunks { n = binary.PutVarint(buf, c.MinTime) - b = append(b, buf[:n]...) + w.b = append(w.b, buf[:n]...) n = binary.PutVarint(buf, c.MaxTime) - b = append(b, buf[:n]...) + w.b = append(w.b, buf[:n]...) n = binary.PutUvarint(buf, uint64(c.Ref)) - b = append(b, buf[:n]...) + w.b = append(w.b, buf[:n]...) } } - l := uint32(len(b)) - - return w.section(l, flagStd, func(wr io.Writer) error { - return w.write(wr, b) + return w.section(len(w.b), flagStd, func(wr io.Writer) error { + return w.write(wr, w.b) }) } @@ -467,7 +479,7 @@ func (w *indexWriter) WriteLabelIndex(names []string, values []string) error { buf := make([]byte, binary.MaxVarintLen32) n := binary.PutUvarint(buf, uint64(len(names))) - l := uint32(n) + uint32(len(values)*4) + l := n + len(values)*4 return w.section(l, flagStd, func(wr io.Writer) error { // First byte indicates tuple size for index. @@ -500,13 +512,10 @@ func (w *indexWriter) WritePostings(name, value string, it Postings) error { offset: uint32(w.n), }) - b := make([]byte, 0, 4096) - buf := [4]byte{} - // Order of the references in the postings list does not imply order // of the series references within the persisted block they are mapped to. // We have to sort the new references again. - var refs []uint32 + refs := w.uint32s[:0] for it.Next() { s, ok := w.series[it.At()] @@ -519,38 +528,49 @@ func (w *indexWriter) WritePostings(name, value string, it Postings) error { return err } - slice.Sort(refs, func(i, j int) bool { return refs[i] < refs[j] }) + sort.Sort(uint32slice(refs)) + + w.b = w.b[:0] + buf := make([]byte, 4) for _, r := range refs { - binary.BigEndian.PutUint32(buf[:], r) - b = append(b, buf[:]...) + binary.BigEndian.PutUint32(buf, r) + w.b = append(w.b, buf...) } - return w.section(uint32(len(b)), flagStd, func(wr io.Writer) error { - return w.write(wr, b) + w.uint32s = refs[:0] + + return w.section(len(w.b), flagStd, func(wr io.Writer) error { + return w.write(wr, w.b) }) } +type uint32slice []uint32 + +func (s uint32slice) Len() int { return len(s) } +func (s uint32slice) Swap(i, j int) { s[i], s[j] = s[j], s[i] } +func (s uint32slice) Less(i, j int) bool { return s[i] < s[j] } + type hashEntry struct { name string offset uint32 } func (w *indexWriter) writeHashmap(h []hashEntry) error { - b := make([]byte, 0, 4096) + w.b = w.b[:0] buf := [binary.MaxVarintLen32]byte{} for _, e := range h { n := binary.PutUvarint(buf[:], uint64(len(e.name))) - b = append(b, buf[:n]...) - b = append(b, e.name...) + w.b = append(w.b, buf[:n]...) + w.b = append(w.b, e.name...) n = binary.PutUvarint(buf[:], uint64(e.offset)) - b = append(b, buf[:n]...) + w.b = append(w.b, buf[:n]...) } - return w.section(uint32(len(b)), flagStd, func(wr io.Writer) error { - return w.write(wr, b) + return w.section(len(w.b), flagStd, func(wr io.Writer) error { + return w.write(wr, w.b) }) }