mirror of https://github.com/prometheus/prometheus
Optimize postings offset table reading (#11535)
* Add BenchmarkOpenBlock * Use specific types when reading offset table Instead of reading a generic-ish []string, we can read a generic type which would be specifically labels.Label. This avoid allocating a slice that escapes to the heap, making it both faster and more efficient in terms of memory management. * Update error message for unexpected number of keys * s/posting offset table/postings offset table/ * Remove useless lastKey assignment * Use two []bytes vars, simplify Applied PR feedback: removed generics, moved the label indices reading to that specific test as we're not using it in production anyway, we're just testing what we've just built. Also using two []bytes variables for name and value that use the backing buffer instead of using strings, this reduces allocations a lot as we only copy them when we store them (this is optimized by the compiler). * Fix the dumb bug Signed-off-by: Oleg Zaytsev <mail@olegzaytsev.com> Co-authored-by: Marco Pracucci <marco@pracucci.com>pull/11579/head
parent
960b6b609a
commit
8553a98267
|
@ -74,10 +74,20 @@ func TestSetCompactionFailed(t *testing.T) {
|
||||||
func TestCreateBlock(t *testing.T) {
|
func TestCreateBlock(t *testing.T) {
|
||||||
tmpdir := t.TempDir()
|
tmpdir := t.TempDir()
|
||||||
b, err := OpenBlock(nil, createBlock(t, tmpdir, genSeries(1, 1, 0, 10)), nil)
|
b, err := OpenBlock(nil, createBlock(t, tmpdir, genSeries(1, 1, 0, 10)), nil)
|
||||||
if err == nil {
|
|
||||||
require.NoError(t, b.Close())
|
|
||||||
}
|
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
require.NoError(t, b.Close())
|
||||||
|
}
|
||||||
|
|
||||||
|
func BenchmarkOpenBlock(b *testing.B) {
|
||||||
|
tmpdir := b.TempDir()
|
||||||
|
blockDir := createBlock(b, tmpdir, genSeries(1e6, 20, 0, 10))
|
||||||
|
b.Run("benchmark", func(b *testing.B) {
|
||||||
|
for i := 0; i < b.N; i++ {
|
||||||
|
block, err := OpenBlock(nil, blockDir, nil)
|
||||||
|
require.NoError(b, err)
|
||||||
|
require.NoError(b, block.Close())
|
||||||
|
}
|
||||||
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestCorruptedChunk(t *testing.T) {
|
func TestCorruptedChunk(t *testing.T) {
|
||||||
|
|
|
@ -1164,44 +1164,37 @@ func newReader(b ByteSlice, c io.Closer) (*Reader, error) {
|
||||||
// Earlier V1 formats don't have a sorted postings offset table, so
|
// Earlier V1 formats don't have a sorted postings offset table, so
|
||||||
// load the whole offset table into memory.
|
// load the whole offset table into memory.
|
||||||
r.postingsV1 = map[string]map[string]uint64{}
|
r.postingsV1 = map[string]map[string]uint64{}
|
||||||
if err := ReadOffsetTable(r.b, r.toc.PostingsTable, func(key []string, off uint64, _ int) error {
|
if err := ReadPostingsOffsetTable(r.b, r.toc.PostingsTable, func(name, value []byte, off uint64, _ int) error {
|
||||||
if len(key) != 2 {
|
if _, ok := r.postingsV1[string(name)]; !ok {
|
||||||
return errors.Errorf("unexpected key length for posting table %d", len(key))
|
r.postingsV1[string(name)] = map[string]uint64{}
|
||||||
|
r.postings[string(name)] = nil // Used to get a list of labelnames in places.
|
||||||
}
|
}
|
||||||
if _, ok := r.postingsV1[key[0]]; !ok {
|
r.postingsV1[string(name)][string(value)] = off
|
||||||
r.postingsV1[key[0]] = map[string]uint64{}
|
|
||||||
r.postings[key[0]] = nil // Used to get a list of labelnames in places.
|
|
||||||
}
|
|
||||||
r.postingsV1[key[0]][key[1]] = off
|
|
||||||
return nil
|
return nil
|
||||||
}); err != nil {
|
}); err != nil {
|
||||||
return nil, errors.Wrap(err, "read postings table")
|
return nil, errors.Wrap(err, "read postings table")
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
var lastKey []string
|
var lastName, lastValue []byte
|
||||||
lastOff := 0
|
lastOff := 0
|
||||||
valueCount := 0
|
valueCount := 0
|
||||||
// For the postings offset table we keep every label name but only every nth
|
// For the postings offset table we keep every label name but only every nth
|
||||||
// label value (plus the first and last one), to save memory.
|
// label value (plus the first and last one), to save memory.
|
||||||
if err := ReadOffsetTable(r.b, r.toc.PostingsTable, func(key []string, _ uint64, off int) error {
|
if err := ReadPostingsOffsetTable(r.b, r.toc.PostingsTable, func(name, value []byte, _ uint64, off int) error {
|
||||||
if len(key) != 2 {
|
if _, ok := r.postings[string(name)]; !ok {
|
||||||
return errors.Errorf("unexpected key length for posting table %d", len(key))
|
|
||||||
}
|
|
||||||
if _, ok := r.postings[key[0]]; !ok {
|
|
||||||
// Next label name.
|
// Next label name.
|
||||||
r.postings[key[0]] = []postingOffset{}
|
r.postings[string(name)] = []postingOffset{}
|
||||||
if lastKey != nil {
|
if lastName != nil {
|
||||||
// Always include last value for each label name.
|
// Always include last value for each label name.
|
||||||
r.postings[lastKey[0]] = append(r.postings[lastKey[0]], postingOffset{value: lastKey[1], off: lastOff})
|
r.postings[string(lastName)] = append(r.postings[string(lastName)], postingOffset{value: string(lastValue), off: lastOff})
|
||||||
}
|
}
|
||||||
lastKey = nil
|
|
||||||
valueCount = 0
|
valueCount = 0
|
||||||
}
|
}
|
||||||
if valueCount%symbolFactor == 0 {
|
if valueCount%symbolFactor == 0 {
|
||||||
r.postings[key[0]] = append(r.postings[key[0]], postingOffset{value: key[1], off: off})
|
r.postings[string(name)] = append(r.postings[string(name)], postingOffset{value: string(value), off: off})
|
||||||
lastKey = nil
|
lastName, lastValue = nil, nil
|
||||||
} else {
|
} else {
|
||||||
lastKey = key
|
lastName, lastValue = name, value
|
||||||
lastOff = off
|
lastOff = off
|
||||||
}
|
}
|
||||||
valueCount++
|
valueCount++
|
||||||
|
@ -1209,8 +1202,8 @@ func newReader(b ByteSlice, c io.Closer) (*Reader, error) {
|
||||||
}); err != nil {
|
}); err != nil {
|
||||||
return nil, errors.Wrap(err, "read postings table")
|
return nil, errors.Wrap(err, "read postings table")
|
||||||
}
|
}
|
||||||
if lastKey != nil {
|
if lastName != nil {
|
||||||
r.postings[lastKey[0]] = append(r.postings[lastKey[0]], postingOffset{value: lastKey[1], off: lastOff})
|
r.postings[string(lastName)] = append(r.postings[string(lastName)], postingOffset{value: string(lastValue), off: lastOff})
|
||||||
}
|
}
|
||||||
// Trim any extra space in the slices.
|
// Trim any extra space in the slices.
|
||||||
for k, v := range r.postings {
|
for k, v := range r.postings {
|
||||||
|
@ -1251,15 +1244,12 @@ type Range struct {
|
||||||
// for all postings lists.
|
// for all postings lists.
|
||||||
func (r *Reader) PostingsRanges() (map[labels.Label]Range, error) {
|
func (r *Reader) PostingsRanges() (map[labels.Label]Range, error) {
|
||||||
m := map[labels.Label]Range{}
|
m := map[labels.Label]Range{}
|
||||||
if err := ReadOffsetTable(r.b, r.toc.PostingsTable, func(key []string, off uint64, _ int) error {
|
if err := ReadPostingsOffsetTable(r.b, r.toc.PostingsTable, func(name, value []byte, off uint64, _ int) error {
|
||||||
if len(key) != 2 {
|
|
||||||
return errors.Errorf("unexpected key length for posting table %d", len(key))
|
|
||||||
}
|
|
||||||
d := encoding.NewDecbufAt(r.b, int(off), castagnoliTable)
|
d := encoding.NewDecbufAt(r.b, int(off), castagnoliTable)
|
||||||
if d.Err() != nil {
|
if d.Err() != nil {
|
||||||
return d.Err()
|
return d.Err()
|
||||||
}
|
}
|
||||||
m[labels.Label{Name: key[0], Value: key[1]}] = Range{
|
m[labels.Label{Name: string(name), Value: string(value)}] = Range{
|
||||||
Start: int64(off) + 4,
|
Start: int64(off) + 4,
|
||||||
End: int64(off) + 4 + int64(d.Len()),
|
End: int64(off) + 4 + int64(d.Len()),
|
||||||
}
|
}
|
||||||
|
@ -1412,29 +1402,29 @@ func (s *symbolsIter) Next() bool {
|
||||||
func (s symbolsIter) At() string { return s.cur }
|
func (s symbolsIter) At() string { return s.cur }
|
||||||
func (s symbolsIter) Err() error { return s.err }
|
func (s symbolsIter) Err() error { return s.err }
|
||||||
|
|
||||||
// ReadOffsetTable reads an offset table and at the given position calls f for each
|
// ReadPostingsOffsetTable reads the postings offset table and at the given position calls f for each
|
||||||
// found entry. If f returns an error it stops decoding and returns the received error.
|
// found entry.
|
||||||
func ReadOffsetTable(bs ByteSlice, off uint64, f func([]string, uint64, int) error) error {
|
// The name and value parameters passed to f reuse the backing memory of the underlying byte slice,
|
||||||
|
// so they shouldn't be persisted without previously copying them.
|
||||||
|
// If f returns an error it stops decoding and returns the received error.
|
||||||
|
func ReadPostingsOffsetTable(bs ByteSlice, off uint64, f func(name, value []byte, postingsOffset uint64, labelOffset int) error) error {
|
||||||
d := encoding.NewDecbufAt(bs, int(off), castagnoliTable)
|
d := encoding.NewDecbufAt(bs, int(off), castagnoliTable)
|
||||||
startLen := d.Len()
|
startLen := d.Len()
|
||||||
cnt := d.Be32()
|
cnt := d.Be32()
|
||||||
|
|
||||||
for d.Err() == nil && d.Len() > 0 && cnt > 0 {
|
for d.Err() == nil && d.Len() > 0 && cnt > 0 {
|
||||||
offsetPos := startLen - d.Len()
|
offsetPos := startLen - d.Len()
|
||||||
keyCount := d.Uvarint()
|
|
||||||
// The Postings offset table takes only 2 keys per entry (name and value of label),
|
|
||||||
// and the LabelIndices offset table takes only 1 key per entry (a label name).
|
|
||||||
// Hence setting the size to max of both, i.e. 2.
|
|
||||||
keys := make([]string, 0, 2)
|
|
||||||
|
|
||||||
for i := 0; i < keyCount; i++ {
|
if keyCount := d.Uvarint(); keyCount != 2 {
|
||||||
keys = append(keys, d.UvarintStr())
|
return errors.Errorf("unexpected number of keys for postings offset table %d", keyCount)
|
||||||
}
|
}
|
||||||
|
name := d.UvarintBytes()
|
||||||
|
value := d.UvarintBytes()
|
||||||
o := d.Uvarint64()
|
o := d.Uvarint64()
|
||||||
if d.Err() != nil {
|
if d.Err() != nil {
|
||||||
break
|
break
|
||||||
}
|
}
|
||||||
if err := f(keys, o, offsetPos); err != nil {
|
if err := f(name, value, o, offsetPos); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
cnt--
|
cnt--
|
||||||
|
|
|
@ -210,28 +210,31 @@ func TestIndexRW_Postings(t *testing.T) {
|
||||||
require.NoError(t, p.Err())
|
require.NoError(t, p.Err())
|
||||||
|
|
||||||
// The label indices are no longer used, so test them by hand here.
|
// The label indices are no longer used, so test them by hand here.
|
||||||
labelIndices := map[string][]string{}
|
labelValuesOffsets := map[string]uint64{}
|
||||||
require.NoError(t, ReadOffsetTable(ir.b, ir.toc.LabelIndicesTable, func(key []string, off uint64, _ int) error {
|
d := encoding.NewDecbufAt(ir.b, int(ir.toc.LabelIndicesTable), castagnoliTable)
|
||||||
if len(key) != 1 {
|
cnt := d.Be32()
|
||||||
return errors.Errorf("unexpected key length for label indices table %d", len(key))
|
|
||||||
}
|
|
||||||
|
|
||||||
|
for d.Err() == nil && d.Len() > 0 && cnt > 0 {
|
||||||
|
require.Equal(t, 1, d.Uvarint(), "Unexpected number of keys for label indices table")
|
||||||
|
lbl := d.UvarintStr()
|
||||||
|
off := d.Uvarint64()
|
||||||
|
labelValuesOffsets[lbl] = off
|
||||||
|
cnt--
|
||||||
|
}
|
||||||
|
require.NoError(t, d.Err())
|
||||||
|
|
||||||
|
labelIndices := map[string][]string{}
|
||||||
|
for lbl, off := range labelValuesOffsets {
|
||||||
d := encoding.NewDecbufAt(ir.b, int(off), castagnoliTable)
|
d := encoding.NewDecbufAt(ir.b, int(off), castagnoliTable)
|
||||||
vals := []string{}
|
require.Equal(t, 1, d.Be32int(), "Unexpected number of label indices table names")
|
||||||
nc := d.Be32int()
|
for i := d.Be32(); i > 0 && d.Err() == nil; i-- {
|
||||||
if nc != 1 {
|
|
||||||
return errors.Errorf("unexpected number of label indices table names %d", nc)
|
|
||||||
}
|
|
||||||
for i := d.Be32(); i > 0; i-- {
|
|
||||||
v, err := ir.lookupSymbol(d.Be32())
|
v, err := ir.lookupSymbol(d.Be32())
|
||||||
if err != nil {
|
require.NoError(t, err)
|
||||||
return err
|
labelIndices[lbl] = append(labelIndices[lbl], v)
|
||||||
}
|
|
||||||
vals = append(vals, v)
|
|
||||||
}
|
}
|
||||||
labelIndices[key[0]] = vals
|
require.NoError(t, d.Err())
|
||||||
return d.Err()
|
}
|
||||||
}))
|
|
||||||
require.Equal(t, map[string][]string{
|
require.Equal(t, map[string][]string{
|
||||||
"a": {"1"},
|
"a": {"1"},
|
||||||
"b": {"1", "2", "3", "4"},
|
"b": {"1", "2", "3", "4"},
|
||||||
|
|
Loading…
Reference in New Issue