mirror of https://github.com/prometheus/prometheus
Merge pull request #158 from prometheus/cachesym
Allocate and cache strings for persisted blockspull/5805/head
commit
665d1fd451
|
@ -18,14 +18,14 @@ It is terminated by a table of contents which serves as an entry point into the
|
||||||
│ ├──────────────────────────────────────────────┤ │
|
│ ├──────────────────────────────────────────────┤ │
|
||||||
│ │ Label Index N │ │
|
│ │ Label Index N │ │
|
||||||
│ ├──────────────────────────────────────────────┤ │
|
│ ├──────────────────────────────────────────────┤ │
|
||||||
│ │ Label Index Table │ │
|
|
||||||
│ ├──────────────────────────────────────────────┤ │
|
|
||||||
│ │ Postings 1 │ │
|
│ │ Postings 1 │ │
|
||||||
│ ├──────────────────────────────────────────────┤ │
|
│ ├──────────────────────────────────────────────┤ │
|
||||||
│ │ ... │ │
|
│ │ ... │ │
|
||||||
│ ├──────────────────────────────────────────────┤ │
|
│ ├──────────────────────────────────────────────┤ │
|
||||||
│ │ Postings N │ │
|
│ │ Postings N │ │
|
||||||
│ ├──────────────────────────────────────────────┤ │
|
│ ├──────────────────────────────────────────────┤ │
|
||||||
|
│ │ Label Index Table │ │
|
||||||
|
│ ├──────────────────────────────────────────────┤ │
|
||||||
│ │ Postings Table │ │
|
│ │ Postings Table │ │
|
||||||
│ ├──────────────────────────────────────────────┤ │
|
│ ├──────────────────────────────────────────────┤ │
|
||||||
│ │ TOC │ │
|
│ │ TOC │ │
|
||||||
|
|
7
block.go
7
block.go
|
@ -26,6 +26,7 @@ import (
|
||||||
"github.com/prometheus/tsdb/labels"
|
"github.com/prometheus/tsdb/labels"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
// DiskBlock represents a data block backed by on-disk data.
|
||||||
type DiskBlock interface {
|
type DiskBlock interface {
|
||||||
BlockReader
|
BlockReader
|
||||||
|
|
||||||
|
@ -42,6 +43,7 @@ type DiskBlock interface {
|
||||||
Close() error
|
Close() error
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// BlockReader provides reading access to a data block.
|
||||||
type BlockReader interface {
|
type BlockReader interface {
|
||||||
// Index returns an IndexReader over the block's data.
|
// Index returns an IndexReader over the block's data.
|
||||||
Index() IndexReader
|
Index() IndexReader
|
||||||
|
@ -53,11 +55,6 @@ type BlockReader interface {
|
||||||
Tombstones() TombstoneReader
|
Tombstones() TombstoneReader
|
||||||
}
|
}
|
||||||
|
|
||||||
// Snapshottable defines an entity that can be backedup online.
|
|
||||||
type Snapshottable interface {
|
|
||||||
Snapshot(dir string) error
|
|
||||||
}
|
|
||||||
|
|
||||||
// Appendable defines an entity to which data can be appended.
|
// Appendable defines an entity to which data can be appended.
|
||||||
type Appendable interface {
|
type Appendable interface {
|
||||||
// Appender returns a new Appender against an underlying store.
|
// Appender returns a new Appender against an underlying store.
|
||||||
|
|
|
@ -77,22 +77,6 @@ func (d *decbuf) uvarint32() uint32 { return uint32(d.uvarint64()) }
|
||||||
func (d *decbuf) be32int() int { return int(d.be32()) }
|
func (d *decbuf) be32int() int { return int(d.be32()) }
|
||||||
func (d *decbuf) be64int64() int64 { return int64(d.be64()) }
|
func (d *decbuf) be64int64() int64 { return int64(d.be64()) }
|
||||||
|
|
||||||
// uvarintTempStr decodes like uvarintStr but the returned string is
|
|
||||||
// not safe to use if the underyling buffer changes.
|
|
||||||
func (d *decbuf) uvarintTempStr() string {
|
|
||||||
l := d.uvarint64()
|
|
||||||
if d.e != nil {
|
|
||||||
return ""
|
|
||||||
}
|
|
||||||
if len(d.b) < int(l) {
|
|
||||||
d.e = errInvalidSize
|
|
||||||
return ""
|
|
||||||
}
|
|
||||||
s := yoloString(d.b[:l])
|
|
||||||
d.b = d.b[l:]
|
|
||||||
return s
|
|
||||||
}
|
|
||||||
|
|
||||||
func (d *decbuf) uvarintStr() string {
|
func (d *decbuf) uvarintStr() string {
|
||||||
l := d.uvarint64()
|
l := d.uvarint64()
|
||||||
if d.e != nil {
|
if d.e != nil {
|
||||||
|
|
83
index.go
83
index.go
|
@ -232,13 +232,13 @@ func (w *indexWriter) ensureStage(s indexWriterStage) error {
|
||||||
w.toc.labelIndices = w.pos
|
w.toc.labelIndices = w.pos
|
||||||
|
|
||||||
case idxStagePostings:
|
case idxStagePostings:
|
||||||
|
w.toc.postings = w.pos
|
||||||
|
|
||||||
|
case idxStageDone:
|
||||||
w.toc.labelIndicesTable = w.pos
|
w.toc.labelIndicesTable = w.pos
|
||||||
if err := w.writeOffsetTable(w.labelIndexes); err != nil {
|
if err := w.writeOffsetTable(w.labelIndexes); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
w.toc.postings = w.pos
|
|
||||||
|
|
||||||
case idxStageDone:
|
|
||||||
w.toc.postingsTable = w.pos
|
w.toc.postingsTable = w.pos
|
||||||
if err := w.writeOffsetTable(w.postings); err != nil {
|
if err := w.writeOffsetTable(w.postings); err != nil {
|
||||||
return err
|
return err
|
||||||
|
@ -404,10 +404,8 @@ func (w *indexWriter) WriteLabelIndex(names []string, values []string) error {
|
||||||
|
|
||||||
// writeOffsetTable writes a sequence of readable hash entries.
|
// writeOffsetTable writes a sequence of readable hash entries.
|
||||||
func (w *indexWriter) writeOffsetTable(entries []hashEntry) error {
|
func (w *indexWriter) writeOffsetTable(entries []hashEntry) error {
|
||||||
w.buf1.reset()
|
|
||||||
w.buf1.putBE32int(len(entries))
|
|
||||||
|
|
||||||
w.buf2.reset()
|
w.buf2.reset()
|
||||||
|
w.buf2.putBE32int(len(entries))
|
||||||
|
|
||||||
for _, e := range entries {
|
for _, e := range entries {
|
||||||
w.buf2.putUvarint(len(e.keys))
|
w.buf2.putUvarint(len(e.keys))
|
||||||
|
@ -417,6 +415,7 @@ func (w *indexWriter) writeOffsetTable(entries []hashEntry) error {
|
||||||
w.buf2.putUvarint64(e.offset)
|
w.buf2.putUvarint64(e.offset)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
w.buf1.reset()
|
||||||
w.buf1.putBE32int(w.buf2.len())
|
w.buf1.putBE32int(w.buf2.len())
|
||||||
w.buf2.putHash(w.crc32)
|
w.buf2.putHash(w.crc32)
|
||||||
|
|
||||||
|
@ -563,6 +562,12 @@ type indexReader struct {
|
||||||
// Cached hashmaps of section offsets.
|
// Cached hashmaps of section offsets.
|
||||||
labels map[string]uint32
|
labels map[string]uint32
|
||||||
postings map[string]uint32
|
postings map[string]uint32
|
||||||
|
// Cache of read symbols. Strings that are returned when reading from the
|
||||||
|
// block are always backed by true strings held in here rather than
|
||||||
|
// strings that are backed by byte slices from the mmap'd index file. This
|
||||||
|
// prevents memory faults when applications work with read symbols after
|
||||||
|
// the block has been unmapped.
|
||||||
|
symbols map[uint32]string
|
||||||
}
|
}
|
||||||
|
|
||||||
var (
|
var (
|
||||||
|
@ -579,7 +584,11 @@ func newIndexReader(dir string) (*indexReader, error) {
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
r := &indexReader{b: f.b, c: f}
|
r := &indexReader{
|
||||||
|
b: f.b,
|
||||||
|
c: f,
|
||||||
|
symbols: map[uint32]string{},
|
||||||
|
}
|
||||||
|
|
||||||
// Verify magic number.
|
// Verify magic number.
|
||||||
if len(f.b) < 4 {
|
if len(f.b) < 4 {
|
||||||
|
@ -592,6 +601,9 @@ func newIndexReader(dir string) (*indexReader, error) {
|
||||||
if err := r.readTOC(); err != nil {
|
if err := r.readTOC(); err != nil {
|
||||||
return nil, errors.Wrap(err, "read TOC")
|
return nil, errors.Wrap(err, "read TOC")
|
||||||
}
|
}
|
||||||
|
if err := r.readSymbols(int(r.toc.symbols)); err != nil {
|
||||||
|
return nil, errors.Wrap(err, "read symbols")
|
||||||
|
}
|
||||||
|
|
||||||
r.labels, err = r.readOffsetTable(r.toc.labelIndicesTable)
|
r.labels, err = r.readOffsetTable(r.toc.labelIndicesTable)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
@ -623,21 +635,40 @@ func (r *indexReader) decbufAt(off int) decbuf {
|
||||||
return decbuf{b: r.b[off:]}
|
return decbuf{b: r.b[off:]}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// readSymbols reads the symbol table fully into memory and allocates proper strings for them.
|
||||||
|
// Strings backed by the mmap'd memory would cause memory faults if applications keep using them
|
||||||
|
// after the reader is closed.
|
||||||
|
func (r *indexReader) readSymbols(off int) error {
|
||||||
|
if off == 0 {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
var (
|
||||||
|
d1 = r.decbufAt(int(off))
|
||||||
|
d2 = d1.decbuf(d1.be32int())
|
||||||
|
origLen = d2.len()
|
||||||
|
cnt = d2.be32int()
|
||||||
|
basePos = uint32(off) + 4
|
||||||
|
nextPos = basePos + uint32(origLen-d2.len())
|
||||||
|
)
|
||||||
|
for d2.err() == nil && d2.len() > 0 && cnt > 0 {
|
||||||
|
s := d2.uvarintStr()
|
||||||
|
r.symbols[uint32(nextPos)] = s
|
||||||
|
|
||||||
|
nextPos = basePos + uint32(origLen-d2.len())
|
||||||
|
cnt--
|
||||||
|
}
|
||||||
|
return d2.err()
|
||||||
|
}
|
||||||
|
|
||||||
// readOffsetTable reads an offset table at the given position and returns a map
|
// readOffsetTable reads an offset table at the given position and returns a map
|
||||||
// with the key strings concatenated by the 0xff unicode non-character.
|
// with the key strings concatenated by the 0xff unicode non-character.
|
||||||
func (r *indexReader) readOffsetTable(off uint64) (map[string]uint32, error) {
|
func (r *indexReader) readOffsetTable(off uint64) (map[string]uint32, error) {
|
||||||
// A table might not have been written at all, in which case the position
|
|
||||||
// is zeroed out.
|
|
||||||
if off == 0 {
|
|
||||||
return nil, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
const sep = "\xff"
|
const sep = "\xff"
|
||||||
|
|
||||||
var (
|
var (
|
||||||
d1 = r.decbufAt(int(off))
|
d1 = r.decbufAt(int(off))
|
||||||
cnt = d1.be32()
|
|
||||||
d2 = d1.decbuf(d1.be32int())
|
d2 = d1.decbuf(d1.be32int())
|
||||||
|
cnt = d2.be32()
|
||||||
)
|
)
|
||||||
|
|
||||||
res := make(map[string]uint32, 512)
|
res := make(map[string]uint32, 512)
|
||||||
|
@ -647,7 +678,7 @@ func (r *indexReader) readOffsetTable(off uint64) (map[string]uint32, error) {
|
||||||
keys := make([]string, 0, keyCount)
|
keys := make([]string, 0, keyCount)
|
||||||
|
|
||||||
for i := 0; i < keyCount; i++ {
|
for i := 0; i < keyCount; i++ {
|
||||||
keys = append(keys, d2.uvarintTempStr())
|
keys = append(keys, d2.uvarintStr())
|
||||||
}
|
}
|
||||||
res[strings.Join(keys, sep)] = uint32(d2.uvarint())
|
res[strings.Join(keys, sep)] = uint32(d2.uvarint())
|
||||||
|
|
||||||
|
@ -682,28 +713,20 @@ func (r *indexReader) section(o uint32) (byte, []byte, error) {
|
||||||
}
|
}
|
||||||
|
|
||||||
func (r *indexReader) lookupSymbol(o uint32) (string, error) {
|
func (r *indexReader) lookupSymbol(o uint32) (string, error) {
|
||||||
d := r.decbufAt(int(o))
|
s, ok := r.symbols[o]
|
||||||
|
if !ok {
|
||||||
s := d.uvarintTempStr()
|
return "", errors.Errorf("unknown symbol offset %d", o)
|
||||||
if d.err() != nil {
|
|
||||||
return "", errors.Wrapf(d.err(), "read symbol at %d", o)
|
|
||||||
}
|
}
|
||||||
return s, nil
|
return s, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (r *indexReader) Symbols() (map[string]struct{}, error) {
|
func (r *indexReader) Symbols() (map[string]struct{}, error) {
|
||||||
d1 := r.decbufAt(int(r.toc.symbols))
|
res := make(map[string]struct{}, len(r.symbols))
|
||||||
d2 := d1.decbuf(d1.be32int())
|
|
||||||
|
|
||||||
count := d2.be32int()
|
for _, s := range r.symbols {
|
||||||
sym := make(map[string]struct{}, count)
|
res[s] = struct{}{}
|
||||||
|
|
||||||
for ; count > 0; count-- {
|
|
||||||
s := d2.uvarintTempStr()
|
|
||||||
sym[s] = struct{}{}
|
|
||||||
}
|
}
|
||||||
|
return res, nil
|
||||||
return sym, d2.err()
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func (r *indexReader) LabelValues(names ...string) (StringTuples, error) {
|
func (r *indexReader) LabelValues(names ...string) (StringTuples, error) {
|
||||||
|
|
|
@ -221,7 +221,7 @@ func TestIndexRW_Postings(t *testing.T) {
|
||||||
|
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
require.Equal(t, 0, len(c))
|
require.Equal(t, 0, len(c))
|
||||||
require.Equal(t, l, series[i])
|
require.Equal(t, series[i], l)
|
||||||
}
|
}
|
||||||
require.NoError(t, p.Err())
|
require.NoError(t, p.Err())
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue