diff --git a/model/textparse/openmetricsparse_test.go b/model/textparse/openmetricsparse_test.go index d128761e3..f3aa21dfa 100644 --- a/model/textparse/openmetricsparse_test.go +++ b/model/textparse/openmetricsparse_test.go @@ -400,7 +400,7 @@ choices}`, "strange©™\n'quoted' \"name\"", "6"), require.Equal(t, exp[i].m, string(m)) require.Equal(t, exp[i].t, ts) require.Equal(t, exp[i].v, v) - require.Equal(t, exp[i].lset, res) + testutil.RequireEqual(t, exp[i].lset, res) if exp[i].e == nil { require.False(t, found) } else { diff --git a/model/textparse/promparse_test.go b/model/textparse/promparse_test.go index cbfc8aa6c..775e5faa5 100644 --- a/model/textparse/promparse_test.go +++ b/model/textparse/promparse_test.go @@ -325,7 +325,7 @@ choices}`, "strange©™\n'quoted' \"name\"", "6"), require.Equal(t, exp[i].m, string(m)) require.Equal(t, exp[i].t, ts) require.Equal(t, exp[i].v, v) - require.Equal(t, exp[i].lset, res) + testutil.RequireEqual(t, exp[i].lset, res) case EntryType: m, typ := p.Type() diff --git a/tsdb/agent/db.go b/tsdb/agent/db.go index d39989713..513c2ed5a 100644 --- a/tsdb/agent/db.go +++ b/tsdb/agent/db.go @@ -417,7 +417,8 @@ func (db *DB) replayWAL() error { func (db *DB) loadWAL(r *wlog.Reader, multiRef map[chunks.HeadSeriesRef]chunks.HeadSeriesRef) (err error) { var ( - dec record.Decoder + syms = labels.NewSymbolTable() // One table for the whole WAL. + dec = record.NewDecoder(syms) lastRef = chunks.HeadSeriesRef(db.nextRef.Load()) decoded = make(chan interface{}, 10) diff --git a/tsdb/head.go b/tsdb/head.go index b3ca0a148..dd2142e52 100644 --- a/tsdb/head.go +++ b/tsdb/head.go @@ -717,6 +717,7 @@ func (h *Head) Init(minValidTime int64) error { h.startWALReplayStatus(startFrom, endAt) + syms := labels.NewSymbolTable() // One table for the whole WAL. multiRef := map[chunks.HeadSeriesRef]chunks.HeadSeriesRef{} if err == nil && startFrom >= snapIdx { sr, err := wlog.NewSegmentsReader(dir) @@ -731,7 +732,7 @@ func (h *Head) Init(minValidTime int64) error { // A corrupted checkpoint is a hard error for now and requires user // intervention. There's likely little data that can be recovered anyway. - if err := h.loadWAL(wlog.NewReader(sr), multiRef, mmappedChunks, oooMmappedChunks); err != nil { + if err := h.loadWAL(wlog.NewReader(sr), syms, multiRef, mmappedChunks, oooMmappedChunks); err != nil { return fmt.Errorf("backfill checkpoint: %w", err) } h.updateWALReplayStatusRead(startFrom) @@ -764,7 +765,7 @@ func (h *Head) Init(minValidTime int64) error { if err != nil { return fmt.Errorf("segment reader (offset=%d): %w", offset, err) } - err = h.loadWAL(wlog.NewReader(sr), multiRef, mmappedChunks, oooMmappedChunks) + err = h.loadWAL(wlog.NewReader(sr), syms, multiRef, mmappedChunks, oooMmappedChunks) if err := sr.Close(); err != nil { level.Warn(h.logger).Log("msg", "Error while closing the wal segments reader", "err", err) } @@ -792,7 +793,7 @@ func (h *Head) Init(minValidTime int64) error { } sr := wlog.NewSegmentBufReader(s) - err = h.loadWBL(wlog.NewReader(sr), multiRef, lastMmapRef) + err = h.loadWBL(wlog.NewReader(sr), syms, multiRef, lastMmapRef) if err := sr.Close(); err != nil { level.Warn(h.logger).Log("msg", "Error while closing the wbl segments reader", "err", err) } diff --git a/tsdb/head_wal.go b/tsdb/head_wal.go index 1be65f134..dd836a537 100644 --- a/tsdb/head_wal.go +++ b/tsdb/head_wal.go @@ -52,7 +52,7 @@ type histogramRecord struct { fh *histogram.FloatHistogram } -func (h *Head) loadWAL(r *wlog.Reader, multiRef map[chunks.HeadSeriesRef]chunks.HeadSeriesRef, mmappedChunks, oooMmappedChunks map[chunks.HeadSeriesRef][]*mmappedChunk) (err error) { +func (h *Head) loadWAL(r *wlog.Reader, syms *labels.SymbolTable, multiRef map[chunks.HeadSeriesRef]chunks.HeadSeriesRef, mmappedChunks, oooMmappedChunks map[chunks.HeadSeriesRef][]*mmappedChunk) (err error) { // Track number of samples that referenced a series we don't know about // for error reporting. var unknownRefs atomic.Uint64 @@ -69,7 +69,6 @@ func (h *Head) loadWAL(r *wlog.Reader, multiRef map[chunks.HeadSeriesRef]chunks. processors = make([]walSubsetProcessor, concurrency) exemplarsInput chan record.RefExemplar - dec record.Decoder shards = make([][]record.RefSample, concurrency) histogramShards = make([][]histogramRecord, concurrency) @@ -137,6 +136,7 @@ func (h *Head) loadWAL(r *wlog.Reader, multiRef map[chunks.HeadSeriesRef]chunks. go func() { defer close(decoded) var err error + dec := record.NewDecoder(syms) for r.Next() { rec := r.Record() switch dec.Type(rec) { @@ -645,7 +645,7 @@ func (wp *walSubsetProcessor) processWALSamples(h *Head, mmappedChunks, oooMmapp return unknownRefs, unknownHistogramRefs, mmapOverlappingChunks } -func (h *Head) loadWBL(r *wlog.Reader, multiRef map[chunks.HeadSeriesRef]chunks.HeadSeriesRef, lastMmapRef chunks.ChunkDiskMapperRef) (err error) { +func (h *Head) loadWBL(r *wlog.Reader, syms *labels.SymbolTable, multiRef map[chunks.HeadSeriesRef]chunks.HeadSeriesRef, lastMmapRef chunks.ChunkDiskMapperRef) (err error) { // Track number of samples, m-map markers, that referenced a series we don't know about // for error reporting. var unknownRefs, mmapMarkerUnknownRefs atomic.Uint64 @@ -657,7 +657,7 @@ func (h *Head) loadWBL(r *wlog.Reader, multiRef map[chunks.HeadSeriesRef]chunks. concurrency = h.opts.WALReplayConcurrency processors = make([]wblSubsetProcessor, concurrency) - dec record.Decoder + dec = record.NewDecoder(syms) shards = make([][]record.RefSample, concurrency) decodedCh = make(chan interface{}, 10) @@ -1360,7 +1360,8 @@ func (h *Head) loadChunkSnapshot() (int, int, map[chunks.HeadSeriesRef]*memSerie errChan = make(chan error, concurrency) refSeries map[chunks.HeadSeriesRef]*memSeries exemplarBuf []record.RefExemplar - dec record.Decoder + syms = labels.NewSymbolTable() // New table for the whole snapshot. + dec = record.NewDecoder(syms) ) wg.Add(concurrency) diff --git a/tsdb/index/index.go b/tsdb/index/index.go index 84c771684..4241ba828 100644 --- a/tsdb/index/index.go +++ b/tsdb/index/index.go @@ -1118,6 +1118,7 @@ type Reader struct { symbols *Symbols nameSymbols map[uint32]string // Cache of the label name symbol lookups, // as there are not many and they are half of all lookups. + st *labels.SymbolTable // TODO: see if we can merge this with nameSymbols. dec *Decoder @@ -1177,6 +1178,7 @@ func newReader(b ByteSlice, c io.Closer) (*Reader, error) { b: b, c: c, postings: map[string][]postingOffset{}, + st: labels.NewSymbolTable(), } // Verify header. @@ -1653,6 +1655,8 @@ func (r *Reader) Series(id storage.SeriesRef, builder *labels.ScratchBuilder, ch if d.Err() != nil { return d.Err() } + builder.SetSymbolTable(r.st) + builder.Reset() err := r.dec.Series(d.Get(), builder, chks) if err != nil { return fmt.Errorf("read series: %w", err) diff --git a/tsdb/record/record.go b/tsdb/record/record.go index 3931ad05d..8a8409e55 100644 --- a/tsdb/record/record.go +++ b/tsdb/record/record.go @@ -192,11 +192,14 @@ type RefMmapMarker struct { } // Decoder decodes series, sample, metadata and tombstone records. -// The zero value is ready to use. type Decoder struct { builder labels.ScratchBuilder } +func NewDecoder(t *labels.SymbolTable) Decoder { // FIXME remove t + return Decoder{builder: labels.NewScratchBuilder(0)} +} + // Type returns the type of the record. // Returns RecordUnknown if no valid record type is found. func (d *Decoder) Type(rec []byte) Type { diff --git a/tsdb/wal.go b/tsdb/wal.go index 1509c9cd9..e06a8aea5 100644 --- a/tsdb/wal.go +++ b/tsdb/wal.go @@ -31,6 +31,7 @@ import ( "github.com/go-kit/log/level" "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/prometheus/model/labels" "github.com/prometheus/prometheus/storage" "github.com/prometheus/prometheus/tsdb/chunks" "github.com/prometheus/prometheus/tsdb/encoding" @@ -859,6 +860,7 @@ func newWALReader(files []*segmentFile, l log.Logger) *walReader { files: files, buf: make([]byte, 0, 128*4096), crc32: newCRC32(), + dec: record.NewDecoder(labels.NewSymbolTable()), } } diff --git a/tsdb/wlog/checkpoint.go b/tsdb/wlog/checkpoint.go index 3d5b56da2..a49ed1a0c 100644 --- a/tsdb/wlog/checkpoint.go +++ b/tsdb/wlog/checkpoint.go @@ -28,6 +28,7 @@ import ( "github.com/go-kit/log/level" "golang.org/x/exp/slices" + "github.com/prometheus/prometheus/model/labels" "github.com/prometheus/prometheus/tsdb/chunks" tsdb_errors "github.com/prometheus/prometheus/tsdb/errors" "github.com/prometheus/prometheus/tsdb/fileutil" @@ -154,7 +155,8 @@ func Checkpoint(logger log.Logger, w *WL, from, to int, keep func(id chunks.Head tstones []tombstones.Stone exemplars []record.RefExemplar metadata []record.RefMetadata - dec record.Decoder + st = labels.NewSymbolTable() // Needed for decoding; labels do not outlive this function. + dec = record.NewDecoder(st) enc record.Encoder buf []byte recs [][]byte diff --git a/tsdb/wlog/watcher.go b/tsdb/wlog/watcher.go index 6994d2dd8..a4c46bbaa 100644 --- a/tsdb/wlog/watcher.go +++ b/tsdb/wlog/watcher.go @@ -29,6 +29,7 @@ import ( "github.com/prometheus/client_golang/prometheus" "golang.org/x/exp/slices" + "github.com/prometheus/prometheus/model/labels" "github.com/prometheus/prometheus/model/timestamp" "github.com/prometheus/prometheus/tsdb/record" ) @@ -532,7 +533,7 @@ func (w *Watcher) garbageCollectSeries(segmentNum int) error { // Also used with readCheckpoint - implements segmentReadFn. func (w *Watcher) readSegment(r *LiveReader, segmentNum int, tail bool) error { var ( - dec record.Decoder + dec = record.NewDecoder(labels.NewSymbolTable()) // One table per WAL segment means it won't grow indefinitely. series []record.RefSeries samples []record.RefSample samplesToSend []record.RefSample @@ -669,7 +670,7 @@ func (w *Watcher) readSegment(r *LiveReader, segmentNum int, tail bool) error { // Used with readCheckpoint - implements segmentReadFn. func (w *Watcher) readSegmentForGC(r *LiveReader, segmentNum int, _ bool) error { var ( - dec record.Decoder + dec = record.NewDecoder(labels.NewSymbolTable()) // Needed for decoding; labels do not outlive this function. series []record.RefSeries ) for r.Next() && !isClosed(w.quit) {