Coalesce series reads where we can.

When compacting rather than doing a read of all
series in the index per label name, do many at once
but only when it won't use (much) more ram than writing the
special all index does.

original in-memory postings:
BenchmarkCompactionFromHead/labelnames=1,labelvalues=100000-4                  1        1202383447 ns/op        158936496 B/op   1031511 allocs/op
BenchmarkCompactionFromHead/labelnames=10,labelvalues=10000-4                  1        1141792706 ns/op        154453408 B/op   1093453 allocs/op
BenchmarkCompactionFromHead/labelnames=100,labelvalues=1000-4                  1        1169288829 ns/op        161072336 B/op   1110021 allocs/op
BenchmarkCompactionFromHead/labelnames=1000,labelvalues=100-4                  1        1115700103 ns/op        149480472 B/op   1129180 allocs/op
BenchmarkCompactionFromHead/labelnames=10000,labelvalues=10-4                  1        1283813141 ns/op        162937800 B/op   1202771 allocs/op

before:
BenchmarkCompactionFromHead/labelnames=1,labelvalues=100000-4                  1        1145195941 ns/op        131749984 B/op    834400 allocs/op
BenchmarkCompactionFromHead/labelnames=10,labelvalues=10000-4                  1        1233526345 ns/op        127889416 B/op    897033 allocs/op
BenchmarkCompactionFromHead/labelnames=100,labelvalues=1000-4                  1        1821942296 ns/op        131665648 B/op    914836 allocs/op
BenchmarkCompactionFromHead/labelnames=1000,labelvalues=100-4                  1        8035568665 ns/op        123811832 B/op    934312 allocs/op
BenchmarkCompactionFromHead/labelnames=10000,labelvalues=10-4                  1       71325926267 ns/op        140722648 B/op   1016824 allocs/op

after:
BenchmarkCompactionFromHead/labelnames=1,labelvalues=100000-4                  1        1101429174 ns/op        129063496 B/op    832571 allocs/op
BenchmarkCompactionFromHead/labelnames=10,labelvalues=10000-4                  1        1074466374 ns/op        124154888 B/op    894875 allocs/op
BenchmarkCompactionFromHead/labelnames=100,labelvalues=1000-4                  1        1166510282 ns/op        128790648 B/op    912931 allocs/op
BenchmarkCompactionFromHead/labelnames=1000,labelvalues=100-4                  1        1075013071 ns/op        120570696 B/op    933511 allocs/op
BenchmarkCompactionFromHead/labelnames=10000,labelvalues=10-4                  1        1231673790 ns/op        138754288 B/op   1022791 allocs/op

Signed-off-by: Brian Brazil <brian.brazil@robustperception.io>
pull/6452/head
Brian Brazil 5 years ago
parent 373a1fdfbf
commit 971dafdfbe

@ -860,6 +860,36 @@ func BenchmarkCompaction(b *testing.B) {
} }
} }
func BenchmarkCompactionFromHead(b *testing.B) {
dir, err := ioutil.TempDir("", "bench_compaction_from_head")
testutil.Ok(b, err)
defer func() {
testutil.Ok(b, os.RemoveAll(dir))
}()
totalSeries := 100000
for labelNames := 1; labelNames < totalSeries; labelNames *= 10 {
labelValues := totalSeries / labelNames
b.Run(fmt.Sprintf("labelnames=%d,labelvalues=%d", labelNames, labelValues), func(b *testing.B) {
h, err := NewHead(nil, nil, nil, 1000)
testutil.Ok(b, err)
for ln := 0; ln < labelNames; ln++ {
app := h.Appender()
for lv := 0; lv < labelValues; lv++ {
app.Add(labels.FromStrings(fmt.Sprintf("%d", ln), fmt.Sprintf("%d%s%d", lv, postingsBenchSuffix, ln)), 0, 0)
}
testutil.Ok(b, app.Commit())
}
b.ResetTimer()
b.ReportAllocs()
for i := 0; i < b.N; i++ {
createBlockFromHead(b, filepath.Join(dir, fmt.Sprintf("%d-%d", i, labelNames)), h)
}
h.Close()
})
}
}
// TestDisableAutoCompactions checks that we can // TestDisableAutoCompactions checks that we can
// disable and enable the auto compaction. // disable and enable the auto compaction.
// This is needed for unit tests that rely on // This is needed for unit tests that rely on

@ -268,7 +268,7 @@ func (d *Decbuf) Byte() byte {
return x return x
} }
func (d *Decbuf) EatPadding() { func (d *Decbuf) ConsumePadding() {
if d.E != nil { if d.E != nil {
return return
} }
@ -279,7 +279,6 @@ func (d *Decbuf) EatPadding() {
d.E = ErrInvalidSize d.E = ErrInvalidSize
return return
} }
return
} }
func (d *Decbuf) Err() error { return d.E } func (d *Decbuf) Err() error { return d.E }

@ -126,7 +126,7 @@ type Writer struct {
reverseSymbols map[uint32]string reverseSymbols map[uint32]string
labelIndexes []labelIndexHashEntry // label index offsets labelIndexes []labelIndexHashEntry // label index offsets
postings []postingsHashEntry // postings lists offsets postings []postingsHashEntry // postings lists offsets
labelNames map[string]struct{} // label names labelNames map[string]uint64 // label names, and their usage
// Hold last series to validate that clients insert new series in order. // Hold last series to validate that clients insert new series in order.
lastSeries labels.Labels lastSeries labels.Labels
@ -208,9 +208,7 @@ func NewWriter(fn string) (*Writer, error) {
buf2: encoding.Encbuf{B: make([]byte, 0, 1<<22)}, buf2: encoding.Encbuf{B: make([]byte, 0, 1<<22)},
// Caches. // Caches.
symbols: make(map[string]uint32, 1<<13), labelNames: make(map[string]uint64, 1<<8),
reverseSymbols: make(map[uint32]string, 1<<13),
labelNames: make(map[string]struct{}, 1<<8),
crc32: newCRC32(), crc32: newCRC32(),
} }
if err := iw.writeMeta(); err != nil { if err := iw.writeMeta(); err != nil {
@ -337,7 +335,7 @@ func (w *Writer) AddSeries(ref uint64, lset labels.Labels, chunks ...chunks.Meta
if !ok { if !ok {
return errors.Errorf("symbol entry for %q does not exist", l.Name) return errors.Errorf("symbol entry for %q does not exist", l.Name)
} }
w.labelNames[l.Name] = struct{}{} w.labelNames[l.Name]++
w.buf2.PutUvarint32(index) w.buf2.PutUvarint32(index)
index, ok = w.symbols[l.Value] index, ok = w.symbols[l.Value]
@ -409,6 +407,7 @@ func (w *Writer) AddSymbols(sym map[string]struct{}) error {
} }
w.symbols = make(map[string]uint32, len(symbols)) w.symbols = make(map[string]uint32, len(symbols))
w.reverseSymbols = make(map[uint32]string, len(symbols))
for index, s := range symbols { for index, s := range symbols {
w.symbols[s] = uint32(index) w.symbols[s] = uint32(index)
@ -596,7 +595,6 @@ func (w *Writer) writeTOC() error {
} }
func (w *Writer) writePostings() error { func (w *Writer) writePostings() error {
names := make([]string, 0, len(w.labelNames)) names := make([]string, 0, len(w.labelNames))
for n := range w.labelNames { for n := range w.labelNames {
names = append(names, n) names = append(names, n)
@ -617,7 +615,7 @@ func (w *Writer) writePostings() error {
d := encoding.NewDecbufRaw(realByteSlice(f.Bytes()), int(w.toc.LabelIndices)) d := encoding.NewDecbufRaw(realByteSlice(f.Bytes()), int(w.toc.LabelIndices))
d.B = d.B[w.toc.Series:] // dec.Skip not merged yet d.B = d.B[w.toc.Series:] // dec.Skip not merged yet
for d.Len() > 0 { for d.Len() > 0 {
d.EatPadding() d.ConsumePadding()
startPos := w.toc.LabelIndices - uint64(d.Len()) startPos := w.toc.LabelIndices - uint64(d.Len())
if startPos%16 != 0 { if startPos%16 != 0 {
return errors.Errorf("series not 16-byte aligned at %d", startPos) return errors.Errorf("series not 16-byte aligned at %d", startPos)
@ -630,32 +628,53 @@ func (w *Writer) writePostings() error {
return nil return nil
} }
} }
w.writePosting("", "", offsets) if err := w.writePosting("", "", offsets); err != nil {
return err
}
maxPostings := uint64(len(offsets)) // No label name can have more postings than this.
for _, name := range names { for len(names) > 0 {
nameo := w.symbols[name] batchNames := []string{}
postings := map[uint32][]uint32{} var c uint64
// Try to bunch up label names into one loop, but avoid
// using more memory than a single label name can.
for len(names) > 0 {
if w.labelNames[names[0]]+c > maxPostings {
break
}
batchNames = append(batchNames, names[0])
names = names[1:]
}
nameSymbols := map[uint32]struct{}{}
for _, name := range batchNames {
nameSymbols[w.symbols[name]] = struct{}{}
}
// Label name -> label value -> positions.
postings := map[uint32]map[uint32][]uint32{}
d := encoding.NewDecbufRaw(realByteSlice(f.Bytes()), int(w.toc.LabelIndices)) d := encoding.NewDecbufRaw(realByteSlice(f.Bytes()), int(w.toc.LabelIndices))
d.B = d.B[w.toc.Series:] // dec.Skip not merged yet d.Skip(int(w.toc.Series))
for d.Len() > 0 { for d.Len() > 0 {
d.EatPadding() d.ConsumePadding()
startPos := w.toc.LabelIndices - uint64(d.Len()) startPos := w.toc.LabelIndices - uint64(d.Len())
l := d.Uvarint() // Length of this series in bytes. l := d.Uvarint() // Length of this series in bytes.
startLen := d.Len() startLen := d.Len()
// See if this label name is in the series. // See if label names we want are in the series.
numLabels := d.Uvarint() numLabels := d.Uvarint()
for i := 0; i < numLabels; i++ { for i := 0; i < numLabels; i++ {
lno := uint32(d.Uvarint()) lno := uint32(d.Uvarint())
lvo := uint32(d.Uvarint()) lvo := uint32(d.Uvarint())
if lno == nameo { if _, ok := nameSymbols[lno]; ok {
if _, ok := postings[lvo]; !ok { if _, ok := postings[lno]; !ok {
postings[lvo] = []uint32{} postings[lno] = map[uint32][]uint32{}
} }
postings[lvo] = append(postings[lvo], uint32(startPos/16)) if _, ok := postings[lno][lvo]; !ok {
break postings[lno][lvo] = []uint32{}
}
postings[lno][lvo] = append(postings[lno][lvo], uint32(startPos/16))
} }
} }
// Skip to next series. The 4 is for the CRC32. // Skip to next series. The 4 is for the CRC32.
@ -666,16 +685,20 @@ func (w *Writer) writePostings() error {
} }
} }
for _, name := range batchNames {
// Write out postings for this label name. // Write out postings for this label name.
values := make([]uint32, 0, len(postings)) values := make([]uint32, 0, len(postings[w.symbols[name]]))
for v := range postings { for v := range postings[w.symbols[name]] {
values = append(values, v) values = append(values, v)
} }
// Symbol numbers are in order, so the strings will also be in order. // Symbol numbers are in order, so the strings will also be in order.
sort.Sort(uint32slice(values)) sort.Sort(uint32slice(values))
for _, v := range values { for _, v := range values {
w.writePosting(name, w.reverseSymbols[v], postings[v]) if err := w.writePosting(name, w.reverseSymbols[v], postings[w.symbols[name]][v]); err != nil {
return err
}
}
} }
} }

@ -211,6 +211,7 @@ func TestIndexRW_Postings(t *testing.T) {
testutil.Ok(t, iw.AddSeries(4, series[3])) testutil.Ok(t, iw.AddSeries(4, series[3]))
err = iw.WriteLabelIndex([]string{"a"}, []string{"1"}) err = iw.WriteLabelIndex([]string{"a"}, []string{"1"})
testutil.Ok(t, err)
err = iw.WriteLabelIndex([]string{"b"}, []string{"1", "2", "3", "4"}) err = iw.WriteLabelIndex([]string{"b"}, []string{"1", "2", "3", "4"})
testutil.Ok(t, err) testutil.Ok(t, err)
@ -266,6 +267,7 @@ func TestPostingsMany(t *testing.T) {
testutil.Ok(t, iw.AddSeries(uint64(i), s)) testutil.Ok(t, iw.AddSeries(uint64(i), s))
} }
err = iw.WriteLabelIndex([]string{"foo"}, []string{"bar"}) err = iw.WriteLabelIndex([]string{"foo"}, []string{"bar"})
testutil.Ok(t, err)
testutil.Ok(t, iw.Close()) testutil.Ok(t, iw.Close())
ir, err := NewFileReader(fn) ir, err := NewFileReader(fn)

@ -62,7 +62,6 @@ func (m *mockIndexWriter) AddSeries(ref uint64, l labels.Labels, chunks ...chunk
} }
func (mockIndexWriter) WriteLabelIndex(names []string, values []string) error { return nil } func (mockIndexWriter) WriteLabelIndex(names []string, values []string) error { return nil }
func (mockIndexWriter) WritePostings() error { return nil }
func (mockIndexWriter) Close() error { return nil } func (mockIndexWriter) Close() error { return nil }
type mockBReader struct { type mockBReader struct {

Loading…
Cancel
Save