|
|
@ -11,7 +11,6 @@ import (
|
|
|
|
"sort"
|
|
|
|
"sort"
|
|
|
|
"strings"
|
|
|
|
"strings"
|
|
|
|
|
|
|
|
|
|
|
|
"github.com/bradfitz/slice"
|
|
|
|
|
|
|
|
"github.com/coreos/etcd/pkg/fileutil"
|
|
|
|
"github.com/coreos/etcd/pkg/fileutil"
|
|
|
|
"github.com/fabxc/tsdb/chunks"
|
|
|
|
"github.com/fabxc/tsdb/chunks"
|
|
|
|
"github.com/fabxc/tsdb/labels"
|
|
|
|
"github.com/fabxc/tsdb/labels"
|
|
|
@ -262,6 +261,10 @@ type indexWriter struct {
|
|
|
|
n int64
|
|
|
|
n int64
|
|
|
|
started bool
|
|
|
|
started bool
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
// Reusable memory.
|
|
|
|
|
|
|
|
b []byte
|
|
|
|
|
|
|
|
uint32s []uint32
|
|
|
|
|
|
|
|
|
|
|
|
series map[uint32]*indexWriterSeries
|
|
|
|
series map[uint32]*indexWriterSeries
|
|
|
|
symbols map[string]uint32 // symbol offsets
|
|
|
|
symbols map[string]uint32 // symbol offsets
|
|
|
|
labelIndexes []hashEntry // label index offsets
|
|
|
|
labelIndexes []hashEntry // label index offsets
|
|
|
@ -285,10 +288,16 @@ func newIndexWriter(dir string) (*indexWriter, error) {
|
|
|
|
|
|
|
|
|
|
|
|
iw := &indexWriter{
|
|
|
|
iw := &indexWriter{
|
|
|
|
f: f,
|
|
|
|
f: f,
|
|
|
|
bufw: bufio.NewWriterSize(f, 1*1024*1024),
|
|
|
|
bufw: bufio.NewWriterSize(f, 1<<22),
|
|
|
|
n: 0,
|
|
|
|
n: 0,
|
|
|
|
symbols: make(map[string]uint32, 4096),
|
|
|
|
|
|
|
|
series: make(map[uint32]*indexWriterSeries, 4096),
|
|
|
|
// Reusable memory.
|
|
|
|
|
|
|
|
b: make([]byte, 0, 1<<23),
|
|
|
|
|
|
|
|
uint32s: make([]uint32, 0, 1<<15),
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
// Caches.
|
|
|
|
|
|
|
|
symbols: make(map[string]uint32, 1<<13),
|
|
|
|
|
|
|
|
series: make(map[uint32]*indexWriterSeries, 1<<16),
|
|
|
|
crc32: crc32.New(crc32.MakeTable(crc32.Castagnoli)),
|
|
|
|
crc32: crc32.New(crc32.MakeTable(crc32.Castagnoli)),
|
|
|
|
}
|
|
|
|
}
|
|
|
|
if err := iw.writeMeta(); err != nil {
|
|
|
|
if err := iw.writeMeta(); err != nil {
|
|
|
@ -304,12 +313,12 @@ func (w *indexWriter) write(wr io.Writer, b []byte) error {
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
// section writes a CRC32 checksummed section of length l and guarded by flag.
|
|
|
|
// section writes a CRC32 checksummed section of length l and guarded by flag.
|
|
|
|
func (w *indexWriter) section(l uint32, flag byte, f func(w io.Writer) error) error {
|
|
|
|
func (w *indexWriter) section(l int, flag byte, f func(w io.Writer) error) error {
|
|
|
|
w.crc32.Reset()
|
|
|
|
w.crc32.Reset()
|
|
|
|
wr := io.MultiWriter(w.crc32, w.bufw)
|
|
|
|
wr := io.MultiWriter(w.crc32, w.bufw)
|
|
|
|
|
|
|
|
|
|
|
|
b := [5]byte{flag, 0, 0, 0, 0}
|
|
|
|
b := [5]byte{flag, 0, 0, 0, 0}
|
|
|
|
binary.BigEndian.PutUint32(b[1:], l)
|
|
|
|
binary.BigEndian.PutUint32(b[1:], uint32(l))
|
|
|
|
|
|
|
|
|
|
|
|
if err := w.write(wr, b[:]); err != nil {
|
|
|
|
if err := w.write(wr, b[:]); err != nil {
|
|
|
|
return errors.Wrap(err, "writing header")
|
|
|
|
return errors.Wrap(err, "writing header")
|
|
|
@ -363,74 +372,77 @@ func (w *indexWriter) writeSymbols() error {
|
|
|
|
base := uint32(w.n) + 5
|
|
|
|
base := uint32(w.n) + 5
|
|
|
|
|
|
|
|
|
|
|
|
buf := [binary.MaxVarintLen32]byte{}
|
|
|
|
buf := [binary.MaxVarintLen32]byte{}
|
|
|
|
b := append(make([]byte, 0, 4096), flagStd)
|
|
|
|
w.b = append(w.b[:0], flagStd)
|
|
|
|
|
|
|
|
|
|
|
|
for _, s := range symbols {
|
|
|
|
for _, s := range symbols {
|
|
|
|
w.symbols[s] = base + uint32(len(b))
|
|
|
|
w.symbols[s] = base + uint32(len(w.b))
|
|
|
|
|
|
|
|
|
|
|
|
n := binary.PutUvarint(buf[:], uint64(len(s)))
|
|
|
|
n := binary.PutUvarint(buf[:], uint64(len(s)))
|
|
|
|
b = append(b, buf[:n]...)
|
|
|
|
w.b = append(w.b, buf[:n]...)
|
|
|
|
b = append(b, s...)
|
|
|
|
w.b = append(w.b, s...)
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
l := uint32(len(b))
|
|
|
|
return w.section(len(w.b), flagStd, func(wr io.Writer) error {
|
|
|
|
|
|
|
|
return w.write(wr, w.b)
|
|
|
|
return w.section(l, flagStd, func(wr io.Writer) error {
|
|
|
|
|
|
|
|
return w.write(wr, b)
|
|
|
|
|
|
|
|
})
|
|
|
|
})
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
type indexWriterSeriesSlice []*indexWriterSeries
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
func (s indexWriterSeriesSlice) Len() int { return len(s) }
|
|
|
|
|
|
|
|
func (s indexWriterSeriesSlice) Swap(i, j int) { s[i], s[j] = s[j], s[i] }
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
func (s indexWriterSeriesSlice) Less(i, j int) bool {
|
|
|
|
|
|
|
|
return labels.Compare(s[i].labels, s[j].labels) < 0
|
|
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
func (w *indexWriter) writeSeries() error {
|
|
|
|
func (w *indexWriter) writeSeries() error {
|
|
|
|
// Series must be stored sorted along their labels.
|
|
|
|
// Series must be stored sorted along their labels.
|
|
|
|
series := make([]*indexWriterSeries, 0, len(w.series))
|
|
|
|
series := make(indexWriterSeriesSlice, 0, len(w.series))
|
|
|
|
|
|
|
|
|
|
|
|
for _, s := range w.series {
|
|
|
|
for _, s := range w.series {
|
|
|
|
series = append(series, s)
|
|
|
|
series = append(series, s)
|
|
|
|
}
|
|
|
|
}
|
|
|
|
slice.Sort(series, func(i, j int) bool {
|
|
|
|
sort.Sort(series)
|
|
|
|
return labels.Compare(series[i].labels, series[j].labels) < 0
|
|
|
|
|
|
|
|
})
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
// Current end of file plus 5 bytes for section header.
|
|
|
|
// Current end of file plus 5 bytes for section header.
|
|
|
|
// TODO(fabxc): switch to relative offsets.
|
|
|
|
// TODO(fabxc): switch to relative offsets.
|
|
|
|
base := uint32(w.n) + 5
|
|
|
|
base := uint32(w.n) + 5
|
|
|
|
|
|
|
|
|
|
|
|
b := make([]byte, 0, 1<<20) // 1MiB
|
|
|
|
w.b = w.b[:0]
|
|
|
|
buf := make([]byte, binary.MaxVarintLen64)
|
|
|
|
buf := make([]byte, binary.MaxVarintLen64)
|
|
|
|
|
|
|
|
|
|
|
|
for _, s := range series {
|
|
|
|
for _, s := range series {
|
|
|
|
// Write label set symbol references.
|
|
|
|
// Write label set symbol references.
|
|
|
|
s.offset = base + uint32(len(b))
|
|
|
|
s.offset = base + uint32(len(w.b))
|
|
|
|
|
|
|
|
|
|
|
|
n := binary.PutUvarint(buf, uint64(len(s.labels)))
|
|
|
|
n := binary.PutUvarint(buf, uint64(len(s.labels)))
|
|
|
|
b = append(b, buf[:n]...)
|
|
|
|
w.b = append(w.b, buf[:n]...)
|
|
|
|
|
|
|
|
|
|
|
|
for _, l := range s.labels {
|
|
|
|
for _, l := range s.labels {
|
|
|
|
n = binary.PutUvarint(buf, uint64(w.symbols[l.Name]))
|
|
|
|
n = binary.PutUvarint(buf, uint64(w.symbols[l.Name]))
|
|
|
|
b = append(b, buf[:n]...)
|
|
|
|
w.b = append(w.b, buf[:n]...)
|
|
|
|
n = binary.PutUvarint(buf, uint64(w.symbols[l.Value]))
|
|
|
|
n = binary.PutUvarint(buf, uint64(w.symbols[l.Value]))
|
|
|
|
b = append(b, buf[:n]...)
|
|
|
|
w.b = append(w.b, buf[:n]...)
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
// Write chunks meta data including reference into chunk file.
|
|
|
|
// Write chunks meta data including reference into chunk file.
|
|
|
|
n = binary.PutUvarint(buf, uint64(len(s.chunks)))
|
|
|
|
n = binary.PutUvarint(buf, uint64(len(s.chunks)))
|
|
|
|
b = append(b, buf[:n]...)
|
|
|
|
w.b = append(w.b, buf[:n]...)
|
|
|
|
|
|
|
|
|
|
|
|
for _, c := range s.chunks {
|
|
|
|
for _, c := range s.chunks {
|
|
|
|
n = binary.PutVarint(buf, c.MinTime)
|
|
|
|
n = binary.PutVarint(buf, c.MinTime)
|
|
|
|
b = append(b, buf[:n]...)
|
|
|
|
w.b = append(w.b, buf[:n]...)
|
|
|
|
n = binary.PutVarint(buf, c.MaxTime)
|
|
|
|
n = binary.PutVarint(buf, c.MaxTime)
|
|
|
|
b = append(b, buf[:n]...)
|
|
|
|
w.b = append(w.b, buf[:n]...)
|
|
|
|
|
|
|
|
|
|
|
|
n = binary.PutUvarint(buf, uint64(c.Ref))
|
|
|
|
n = binary.PutUvarint(buf, uint64(c.Ref))
|
|
|
|
b = append(b, buf[:n]...)
|
|
|
|
w.b = append(w.b, buf[:n]...)
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
l := uint32(len(b))
|
|
|
|
return w.section(len(w.b), flagStd, func(wr io.Writer) error {
|
|
|
|
|
|
|
|
return w.write(wr, w.b)
|
|
|
|
return w.section(l, flagStd, func(wr io.Writer) error {
|
|
|
|
|
|
|
|
return w.write(wr, b)
|
|
|
|
|
|
|
|
})
|
|
|
|
})
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
@ -467,7 +479,7 @@ func (w *indexWriter) WriteLabelIndex(names []string, values []string) error {
|
|
|
|
buf := make([]byte, binary.MaxVarintLen32)
|
|
|
|
buf := make([]byte, binary.MaxVarintLen32)
|
|
|
|
n := binary.PutUvarint(buf, uint64(len(names)))
|
|
|
|
n := binary.PutUvarint(buf, uint64(len(names)))
|
|
|
|
|
|
|
|
|
|
|
|
l := uint32(n) + uint32(len(values)*4)
|
|
|
|
l := n + len(values)*4
|
|
|
|
|
|
|
|
|
|
|
|
return w.section(l, flagStd, func(wr io.Writer) error {
|
|
|
|
return w.section(l, flagStd, func(wr io.Writer) error {
|
|
|
|
// First byte indicates tuple size for index.
|
|
|
|
// First byte indicates tuple size for index.
|
|
|
@ -500,13 +512,10 @@ func (w *indexWriter) WritePostings(name, value string, it Postings) error {
|
|
|
|
offset: uint32(w.n),
|
|
|
|
offset: uint32(w.n),
|
|
|
|
})
|
|
|
|
})
|
|
|
|
|
|
|
|
|
|
|
|
b := make([]byte, 0, 4096)
|
|
|
|
|
|
|
|
buf := [4]byte{}
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
// Order of the references in the postings list does not imply order
|
|
|
|
// Order of the references in the postings list does not imply order
|
|
|
|
// of the series references within the persisted block they are mapped to.
|
|
|
|
// of the series references within the persisted block they are mapped to.
|
|
|
|
// We have to sort the new references again.
|
|
|
|
// We have to sort the new references again.
|
|
|
|
var refs []uint32
|
|
|
|
refs := w.uint32s[:0]
|
|
|
|
|
|
|
|
|
|
|
|
for it.Next() {
|
|
|
|
for it.Next() {
|
|
|
|
s, ok := w.series[it.At()]
|
|
|
|
s, ok := w.series[it.At()]
|
|
|
@ -519,38 +528,49 @@ func (w *indexWriter) WritePostings(name, value string, it Postings) error {
|
|
|
|
return err
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
slice.Sort(refs, func(i, j int) bool { return refs[i] < refs[j] })
|
|
|
|
sort.Sort(uint32slice(refs))
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
w.b = w.b[:0]
|
|
|
|
|
|
|
|
buf := make([]byte, 4)
|
|
|
|
|
|
|
|
|
|
|
|
for _, r := range refs {
|
|
|
|
for _, r := range refs {
|
|
|
|
binary.BigEndian.PutUint32(buf[:], r)
|
|
|
|
binary.BigEndian.PutUint32(buf, r)
|
|
|
|
b = append(b, buf[:]...)
|
|
|
|
w.b = append(w.b, buf...)
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
return w.section(uint32(len(b)), flagStd, func(wr io.Writer) error {
|
|
|
|
w.uint32s = refs[:0]
|
|
|
|
return w.write(wr, b)
|
|
|
|
|
|
|
|
|
|
|
|
return w.section(len(w.b), flagStd, func(wr io.Writer) error {
|
|
|
|
|
|
|
|
return w.write(wr, w.b)
|
|
|
|
})
|
|
|
|
})
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
type uint32slice []uint32
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
func (s uint32slice) Len() int { return len(s) }
|
|
|
|
|
|
|
|
func (s uint32slice) Swap(i, j int) { s[i], s[j] = s[j], s[i] }
|
|
|
|
|
|
|
|
func (s uint32slice) Less(i, j int) bool { return s[i] < s[j] }
|
|
|
|
|
|
|
|
|
|
|
|
type hashEntry struct {
|
|
|
|
type hashEntry struct {
|
|
|
|
name string
|
|
|
|
name string
|
|
|
|
offset uint32
|
|
|
|
offset uint32
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
func (w *indexWriter) writeHashmap(h []hashEntry) error {
|
|
|
|
func (w *indexWriter) writeHashmap(h []hashEntry) error {
|
|
|
|
b := make([]byte, 0, 4096)
|
|
|
|
w.b = w.b[:0]
|
|
|
|
buf := [binary.MaxVarintLen32]byte{}
|
|
|
|
buf := [binary.MaxVarintLen32]byte{}
|
|
|
|
|
|
|
|
|
|
|
|
for _, e := range h {
|
|
|
|
for _, e := range h {
|
|
|
|
n := binary.PutUvarint(buf[:], uint64(len(e.name)))
|
|
|
|
n := binary.PutUvarint(buf[:], uint64(len(e.name)))
|
|
|
|
b = append(b, buf[:n]...)
|
|
|
|
w.b = append(w.b, buf[:n]...)
|
|
|
|
b = append(b, e.name...)
|
|
|
|
w.b = append(w.b, e.name...)
|
|
|
|
|
|
|
|
|
|
|
|
n = binary.PutUvarint(buf[:], uint64(e.offset))
|
|
|
|
n = binary.PutUvarint(buf[:], uint64(e.offset))
|
|
|
|
b = append(b, buf[:n]...)
|
|
|
|
w.b = append(w.b, buf[:n]...)
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
return w.section(uint32(len(b)), flagStd, func(wr io.Writer) error {
|
|
|
|
return w.section(len(w.b), flagStd, func(wr io.Writer) error {
|
|
|
|
return w.write(wr, b)
|
|
|
|
return w.write(wr, w.b)
|
|
|
|
})
|
|
|
|
})
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|