tsdb: create SymbolTables for labels as required

Signed-off-by: Bryan Boreham <bjboreham@gmail.com>
pull/12304/head
Bryan Boreham 2 years ago
parent 2ac1632eec
commit 93b72ec5dd

@ -400,7 +400,7 @@ choices}`, "strange©™\n'quoted' \"name\"", "6"),
require.Equal(t, exp[i].m, string(m))
require.Equal(t, exp[i].t, ts)
require.Equal(t, exp[i].v, v)
require.Equal(t, exp[i].lset, res)
testutil.RequireEqual(t, exp[i].lset, res)
if exp[i].e == nil {
require.False(t, found)
} else {

@ -325,7 +325,7 @@ choices}`, "strange©™\n'quoted' \"name\"", "6"),
require.Equal(t, exp[i].m, string(m))
require.Equal(t, exp[i].t, ts)
require.Equal(t, exp[i].v, v)
require.Equal(t, exp[i].lset, res)
testutil.RequireEqual(t, exp[i].lset, res)
case EntryType:
m, typ := p.Type()

@ -417,7 +417,8 @@ func (db *DB) replayWAL() error {
func (db *DB) loadWAL(r *wlog.Reader, multiRef map[chunks.HeadSeriesRef]chunks.HeadSeriesRef) (err error) {
var (
dec record.Decoder
syms = labels.NewSymbolTable() // One table for the whole WAL.
dec = record.NewDecoder(syms)
lastRef = chunks.HeadSeriesRef(db.nextRef.Load())
decoded = make(chan interface{}, 10)

@ -717,6 +717,7 @@ func (h *Head) Init(minValidTime int64) error {
h.startWALReplayStatus(startFrom, endAt)
syms := labels.NewSymbolTable() // One table for the whole WAL.
multiRef := map[chunks.HeadSeriesRef]chunks.HeadSeriesRef{}
if err == nil && startFrom >= snapIdx {
sr, err := wlog.NewSegmentsReader(dir)
@ -731,7 +732,7 @@ func (h *Head) Init(minValidTime int64) error {
// A corrupted checkpoint is a hard error for now and requires user
// intervention. There's likely little data that can be recovered anyway.
if err := h.loadWAL(wlog.NewReader(sr), multiRef, mmappedChunks, oooMmappedChunks); err != nil {
if err := h.loadWAL(wlog.NewReader(sr), syms, multiRef, mmappedChunks, oooMmappedChunks); err != nil {
return fmt.Errorf("backfill checkpoint: %w", err)
}
h.updateWALReplayStatusRead(startFrom)
@ -764,7 +765,7 @@ func (h *Head) Init(minValidTime int64) error {
if err != nil {
return fmt.Errorf("segment reader (offset=%d): %w", offset, err)
}
err = h.loadWAL(wlog.NewReader(sr), multiRef, mmappedChunks, oooMmappedChunks)
err = h.loadWAL(wlog.NewReader(sr), syms, multiRef, mmappedChunks, oooMmappedChunks)
if err := sr.Close(); err != nil {
level.Warn(h.logger).Log("msg", "Error while closing the wal segments reader", "err", err)
}
@ -792,7 +793,7 @@ func (h *Head) Init(minValidTime int64) error {
}
sr := wlog.NewSegmentBufReader(s)
err = h.loadWBL(wlog.NewReader(sr), multiRef, lastMmapRef)
err = h.loadWBL(wlog.NewReader(sr), syms, multiRef, lastMmapRef)
if err := sr.Close(); err != nil {
level.Warn(h.logger).Log("msg", "Error while closing the wbl segments reader", "err", err)
}

@ -52,7 +52,7 @@ type histogramRecord struct {
fh *histogram.FloatHistogram
}
func (h *Head) loadWAL(r *wlog.Reader, multiRef map[chunks.HeadSeriesRef]chunks.HeadSeriesRef, mmappedChunks, oooMmappedChunks map[chunks.HeadSeriesRef][]*mmappedChunk) (err error) {
func (h *Head) loadWAL(r *wlog.Reader, syms *labels.SymbolTable, multiRef map[chunks.HeadSeriesRef]chunks.HeadSeriesRef, mmappedChunks, oooMmappedChunks map[chunks.HeadSeriesRef][]*mmappedChunk) (err error) {
// Track number of samples that referenced a series we don't know about
// for error reporting.
var unknownRefs atomic.Uint64
@ -69,7 +69,6 @@ func (h *Head) loadWAL(r *wlog.Reader, multiRef map[chunks.HeadSeriesRef]chunks.
processors = make([]walSubsetProcessor, concurrency)
exemplarsInput chan record.RefExemplar
dec record.Decoder
shards = make([][]record.RefSample, concurrency)
histogramShards = make([][]histogramRecord, concurrency)
@ -137,6 +136,7 @@ func (h *Head) loadWAL(r *wlog.Reader, multiRef map[chunks.HeadSeriesRef]chunks.
go func() {
defer close(decoded)
var err error
dec := record.NewDecoder(syms)
for r.Next() {
rec := r.Record()
switch dec.Type(rec) {
@ -645,7 +645,7 @@ func (wp *walSubsetProcessor) processWALSamples(h *Head, mmappedChunks, oooMmapp
return unknownRefs, unknownHistogramRefs, mmapOverlappingChunks
}
func (h *Head) loadWBL(r *wlog.Reader, multiRef map[chunks.HeadSeriesRef]chunks.HeadSeriesRef, lastMmapRef chunks.ChunkDiskMapperRef) (err error) {
func (h *Head) loadWBL(r *wlog.Reader, syms *labels.SymbolTable, multiRef map[chunks.HeadSeriesRef]chunks.HeadSeriesRef, lastMmapRef chunks.ChunkDiskMapperRef) (err error) {
// Track number of samples, m-map markers, that referenced a series we don't know about
// for error reporting.
var unknownRefs, mmapMarkerUnknownRefs atomic.Uint64
@ -657,7 +657,7 @@ func (h *Head) loadWBL(r *wlog.Reader, multiRef map[chunks.HeadSeriesRef]chunks.
concurrency = h.opts.WALReplayConcurrency
processors = make([]wblSubsetProcessor, concurrency)
dec record.Decoder
dec = record.NewDecoder(syms)
shards = make([][]record.RefSample, concurrency)
decodedCh = make(chan interface{}, 10)
@ -1360,7 +1360,8 @@ func (h *Head) loadChunkSnapshot() (int, int, map[chunks.HeadSeriesRef]*memSerie
errChan = make(chan error, concurrency)
refSeries map[chunks.HeadSeriesRef]*memSeries
exemplarBuf []record.RefExemplar
dec record.Decoder
syms = labels.NewSymbolTable() // New table for the whole snapshot.
dec = record.NewDecoder(syms)
)
wg.Add(concurrency)

@ -1118,6 +1118,7 @@ type Reader struct {
symbols *Symbols
nameSymbols map[uint32]string // Cache of the label name symbol lookups,
// as there are not many and they are half of all lookups.
st *labels.SymbolTable // TODO: see if we can merge this with nameSymbols.
dec *Decoder
@ -1177,6 +1178,7 @@ func newReader(b ByteSlice, c io.Closer) (*Reader, error) {
b: b,
c: c,
postings: map[string][]postingOffset{},
st: labels.NewSymbolTable(),
}
// Verify header.
@ -1653,6 +1655,8 @@ func (r *Reader) Series(id storage.SeriesRef, builder *labels.ScratchBuilder, ch
if d.Err() != nil {
return d.Err()
}
builder.SetSymbolTable(r.st)
builder.Reset()
err := r.dec.Series(d.Get(), builder, chks)
if err != nil {
return fmt.Errorf("read series: %w", err)

@ -192,11 +192,14 @@ type RefMmapMarker struct {
}
// Decoder decodes series, sample, metadata and tombstone records.
// The zero value is ready to use.
type Decoder struct {
builder labels.ScratchBuilder
}
func NewDecoder(t *labels.SymbolTable) Decoder { // FIXME remove t
return Decoder{builder: labels.NewScratchBuilder(0)}
}
// Type returns the type of the record.
// Returns RecordUnknown if no valid record type is found.
func (d *Decoder) Type(rec []byte) Type {

@ -31,6 +31,7 @@ import (
"github.com/go-kit/log/level"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/prometheus/model/labels"
"github.com/prometheus/prometheus/storage"
"github.com/prometheus/prometheus/tsdb/chunks"
"github.com/prometheus/prometheus/tsdb/encoding"
@ -859,6 +860,7 @@ func newWALReader(files []*segmentFile, l log.Logger) *walReader {
files: files,
buf: make([]byte, 0, 128*4096),
crc32: newCRC32(),
dec: record.NewDecoder(labels.NewSymbolTable()),
}
}

@ -28,6 +28,7 @@ import (
"github.com/go-kit/log/level"
"golang.org/x/exp/slices"
"github.com/prometheus/prometheus/model/labels"
"github.com/prometheus/prometheus/tsdb/chunks"
tsdb_errors "github.com/prometheus/prometheus/tsdb/errors"
"github.com/prometheus/prometheus/tsdb/fileutil"
@ -154,7 +155,8 @@ func Checkpoint(logger log.Logger, w *WL, from, to int, keep func(id chunks.Head
tstones []tombstones.Stone
exemplars []record.RefExemplar
metadata []record.RefMetadata
dec record.Decoder
st = labels.NewSymbolTable() // Needed for decoding; labels do not outlive this function.
dec = record.NewDecoder(st)
enc record.Encoder
buf []byte
recs [][]byte

@ -29,6 +29,7 @@ import (
"github.com/prometheus/client_golang/prometheus"
"golang.org/x/exp/slices"
"github.com/prometheus/prometheus/model/labels"
"github.com/prometheus/prometheus/model/timestamp"
"github.com/prometheus/prometheus/tsdb/record"
)
@ -532,7 +533,7 @@ func (w *Watcher) garbageCollectSeries(segmentNum int) error {
// Also used with readCheckpoint - implements segmentReadFn.
func (w *Watcher) readSegment(r *LiveReader, segmentNum int, tail bool) error {
var (
dec record.Decoder
dec = record.NewDecoder(labels.NewSymbolTable()) // One table per WAL segment means it won't grow indefinitely.
series []record.RefSeries
samples []record.RefSample
samplesToSend []record.RefSample
@ -669,7 +670,7 @@ func (w *Watcher) readSegment(r *LiveReader, segmentNum int, tail bool) error {
// Used with readCheckpoint - implements segmentReadFn.
func (w *Watcher) readSegmentForGC(r *LiveReader, segmentNum int, _ bool) error {
var (
dec record.Decoder
dec = record.NewDecoder(labels.NewSymbolTable()) // Needed for decoding; labels do not outlive this function.
series []record.RefSeries
)
for r.Next() && !isClosed(w.quit) {

Loading…
Cancel
Save