diff --git a/tsdb/head.go b/tsdb/head.go index 5972a9c5d..30ad8139a 100644 --- a/tsdb/head.go +++ b/tsdb/head.go @@ -1759,12 +1759,12 @@ type seriesHashmap struct { func (m *seriesHashmap) get(hash uint64, lset labels.Labels) *memSeries { if s, found := m.unique[hash]; found { - if labels.Equal(s.lset, lset) { + if labels.Equal(s.labels(), lset) { return s } } for _, s := range m.conflicts[hash] { - if labels.Equal(s.lset, lset) { + if labels.Equal(s.labels(), lset) { return s } } @@ -1772,7 +1772,7 @@ func (m *seriesHashmap) get(hash uint64, lset labels.Labels) *memSeries { } func (m *seriesHashmap) set(hash uint64, s *memSeries) { - if existing, found := m.unique[hash]; !found || labels.Equal(existing.lset, s.lset) { + if existing, found := m.unique[hash]; !found || labels.Equal(existing.labels(), s.labels()) { m.unique[hash] = s return } @@ -1781,7 +1781,7 @@ func (m *seriesHashmap) set(hash uint64, s *memSeries) { } l := m.conflicts[hash] for i, prev := range l { - if labels.Equal(prev.lset, s.lset) { + if labels.Equal(prev.labels(), s.labels()) { l[i] = s return } @@ -1931,7 +1931,7 @@ func (s *stripeSeries) gc(mint int64, minOOOMmapRef chunks.ChunkDiskMapperRef) ( series.lset.Range(func(l labels.Label) { affected[l] = struct{}{} }) s.hashes[hashShard].del(hash, series.ref) delete(s.series[refShard], series.ref) - deletedForCallback[series.ref] = series.lset + deletedForCallback[series.ref] = series.lset // OK to access lset; series is locked at the top of this function. } s.iterForDeletion(check) @@ -2023,7 +2023,7 @@ func (s *stripeSeries) getOrSet(hash uint64, lset labels.Labels, createSeries fu } // Setting the series in the s.hashes marks the creation of series // as any further calls to this methods would return that series. - s.seriesLifecycleCallback.PostCreation(series.lset) + s.seriesLifecycleCallback.PostCreation(series.labels()) i = uint64(series.ref) & uint64(s.size-1) @@ -2064,16 +2064,19 @@ func (s sample) Type() chunkenc.ValueType { // memSeries is the in-memory representation of a series. None of its methods // are goroutine safe and it is the caller's responsibility to lock it. type memSeries struct { - sync.Mutex - + // Members up to the Mutex are not changed after construction, so can be accessed without a lock. ref chunks.HeadSeriesRef - lset labels.Labels meta *metadata.Metadata // Series labels hash to use for sharding purposes. The value is always 0 when sharding has not // been explicitly enabled in TSDB. shardHash uint64 + // Everything after here should only be accessed with the lock held. + sync.Mutex + + lset labels.Labels // Locking required with -tags dedupelabels, not otherwise. + // Immutable chunks on disk that have not yet gone into a block, in order of ascending time stamps. // When compaction runs, chunks get moved into a block and all pointers are shifted like so: // diff --git a/tsdb/head_append.go b/tsdb/head_append.go index 62c3727e2..f45ab606b 100644 --- a/tsdb/head_append.go +++ b/tsdb/head_append.go @@ -554,7 +554,7 @@ func (a *headAppender) AppendExemplar(ref storage.SeriesRef, lset labels.Labels, // Ensure no empty labels have gotten through. e.Labels = e.Labels.WithoutEmpty() - err := a.head.exemplars.ValidateExemplar(s.lset, e) + err := a.head.exemplars.ValidateExemplar(s.labels(), e) if err != nil { if errors.Is(err, storage.ErrDuplicateExemplar) || errors.Is(err, storage.ErrExemplarsDisabled) { // Duplicate, don't return an error but don't accept the exemplar. @@ -708,7 +708,7 @@ func (a *headAppender) GetRef(lset labels.Labels, hash uint64) (storage.SeriesRe return 0, labels.EmptyLabels() } // returned labels must be suitable to pass to Append() - return storage.SeriesRef(s.ref), s.lset + return storage.SeriesRef(s.ref), s.labels() } // log writes all headAppender's data to the WAL. @@ -816,7 +816,7 @@ func (a *headAppender) Commit() (err error) { continue } // We don't instrument exemplar appends here, all is instrumented by storage. - if err := a.head.exemplars.AddExemplar(s.lset, e.exemplar); err != nil { + if err := a.head.exemplars.AddExemplar(s.labels(), e.exemplar); err != nil { if errors.Is(err, storage.ErrOutOfOrderExemplar) { continue } diff --git a/tsdb/head_dedupelabels.go b/tsdb/head_dedupelabels.go new file mode 100644 index 000000000..203f92e6a --- /dev/null +++ b/tsdb/head_dedupelabels.go @@ -0,0 +1,27 @@ +// Copyright 2024 The Prometheus Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +//go:build dedupelabels + +package tsdb + +import ( + "github.com/prometheus/prometheus/model/labels" +) + +// Helper method to access labels under lock. +func (s *memSeries) labels() labels.Labels { + s.Lock() + defer s.Unlock() + return s.lset +} diff --git a/tsdb/head_other.go b/tsdb/head_other.go new file mode 100644 index 000000000..9306913d8 --- /dev/null +++ b/tsdb/head_other.go @@ -0,0 +1,25 @@ +// Copyright 2024 The Prometheus Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +//go:build !dedupelabels + +package tsdb + +import ( + "github.com/prometheus/prometheus/model/labels" +) + +// Helper method to access labels; trivial when not using dedupelabels. +func (s *memSeries) labels() labels.Labels { + return s.lset +} diff --git a/tsdb/head_read.go b/tsdb/head_read.go index b47e24f9e..9ba8785ad 100644 --- a/tsdb/head_read.go +++ b/tsdb/head_read.go @@ -142,7 +142,7 @@ func (h *headIndexReader) SortedPostings(p index.Postings) index.Postings { } slices.SortFunc(series, func(a, b *memSeries) int { - return labels.Compare(a.lset, b.lset) + return labels.Compare(a.labels(), b.labels()) }) // Convert back to list. @@ -189,7 +189,7 @@ func (h *headIndexReader) Series(ref storage.SeriesRef, builder *labels.ScratchB h.head.metrics.seriesNotFound.Inc() return storage.ErrNotFound } - builder.Assign(s.lset) + builder.Assign(s.labels()) if chks == nil { return nil @@ -259,7 +259,7 @@ func (h *headIndexReader) LabelValueFor(_ context.Context, id storage.SeriesRef, return "", storage.ErrNotFound } - value := memSeries.lset.Get(label) + value := memSeries.labels().Get(label) if value == "" { return "", storage.ErrNotFound } @@ -283,7 +283,7 @@ func (h *headIndexReader) LabelNamesFor(ctx context.Context, series index.Postin // when series was garbage collected after the caller got the series IDs. continue } - memSeries.lset.Range(func(lbl labels.Label) { + memSeries.labels().Range(func(lbl labels.Label) { namesMap[lbl.Name] = struct{}{} }) } diff --git a/tsdb/head_wal.go b/tsdb/head_wal.go index 41f7dd46b..787cb7c26 100644 --- a/tsdb/head_wal.go +++ b/tsdb/head_wal.go @@ -126,7 +126,7 @@ func (h *Head) loadWAL(r *wlog.Reader, syms *labels.SymbolTable, multiRef map[ch } // At the moment the only possible error here is out of order exemplars, which we shouldn't see when // replaying the WAL, so lets just log the error if it's not that type. - err = h.exemplars.AddExemplar(ms.lset, exemplar.Exemplar{Ts: e.T, Value: e.V, Labels: e.Labels}) + err = h.exemplars.AddExemplar(ms.labels(), exemplar.Exemplar{Ts: e.T, Value: e.V, Labels: e.Labels}) if err != nil && errors.Is(err, storage.ErrOutOfOrderExemplar) { level.Warn(h.logger).Log("msg", "Unexpected error when replaying WAL on exemplar record", "err", err) } @@ -448,7 +448,7 @@ func (h *Head) resetSeriesWithMMappedChunks(mSeries *memSeries, mmc, oooMmc []*m ) { level.Debug(h.logger).Log( "msg", "M-mapped chunks overlap on a duplicate series record", - "series", mSeries.lset.String(), + "series", mSeries.labels().String(), "oldref", mSeries.ref, "oldmint", mSeries.mmappedChunks[0].minTime, "oldmaxt", mSeries.mmappedChunks[len(mSeries.mmappedChunks)-1].maxTime, @@ -932,7 +932,7 @@ func (s *memSeries) encodeToSnapshotRecord(b []byte) []byte { buf.PutByte(chunkSnapshotRecordTypeSeries) buf.PutBE64(uint64(s.ref)) - record.EncodeLabels(&buf, s.lset) + record.EncodeLabels(&buf, s.labels()) buf.PutBE64int64(0) // Backwards-compatibility; was chunkRange but now unused. s.Lock() @@ -1485,7 +1485,7 @@ Outer: continue } - if err := h.exemplars.AddExemplar(ms.lset, exemplar.Exemplar{ + if err := h.exemplars.AddExemplar(ms.labels(), exemplar.Exemplar{ Labels: e.Labels, Value: e.V, Ts: e.T, diff --git a/tsdb/ooo_head_read.go b/tsdb/ooo_head_read.go index 47972c3cc..4e8329c99 100644 --- a/tsdb/ooo_head_read.go +++ b/tsdb/ooo_head_read.go @@ -78,7 +78,7 @@ func (oh *OOOHeadIndexReader) series(ref storage.SeriesRef, builder *labels.Scra oh.head.metrics.seriesNotFound.Inc() return storage.ErrNotFound } - builder.Assign(s.lset) + builder.Assign(s.labels()) if chks == nil { return nil