diff --git a/tsdb/exemplar.go b/tsdb/exemplar.go index 213fff5c2..7545ab9a6 100644 --- a/tsdb/exemplar.go +++ b/tsdb/exemplar.go @@ -37,7 +37,7 @@ const ( type CircularExemplarStorage struct { lock sync.RWMutex - exemplars []*circularBufferEntry + exemplars []circularBufferEntry nextIndex int metrics *ExemplarMetrics @@ -121,7 +121,7 @@ func NewCircularExemplarStorage(length int64, m *ExemplarMetrics) (ExemplarStora length = 0 } c := &CircularExemplarStorage{ - exemplars: make([]*circularBufferEntry, length), + exemplars: make([]circularBufferEntry, length), index: make(map[string]*indexEntry, length/estimatedExemplarsPerSeries), metrics: m, } @@ -214,12 +214,12 @@ func (ce *CircularExemplarStorage) ValidateExemplar(l labels.Labels, e exemplar. // Optimize by moving the lock to be per series (& benchmark it). ce.lock.RLock() defer ce.lock.RUnlock() - return ce.validateExemplar(seriesLabels, e, false) + return ce.validateExemplar(ce.index[string(seriesLabels)], e, false) } // Not thread safe. The appended parameters tells us whether this is an external validation, or internal // as a result of an AddExemplar call, in which case we should update any relevant metrics. -func (ce *CircularExemplarStorage) validateExemplar(key []byte, e exemplar.Exemplar, appended bool) error { +func (ce *CircularExemplarStorage) validateExemplar(idx *indexEntry, e exemplar.Exemplar, appended bool) error { if len(ce.exemplars) == 0 { return storage.ErrExemplarsDisabled } @@ -239,8 +239,7 @@ func (ce *CircularExemplarStorage) validateExemplar(key []byte, e exemplar.Exemp return err } - idx, ok := ce.index[string(key)] - if !ok { + if idx == nil { return nil } @@ -292,7 +291,7 @@ func (ce *CircularExemplarStorage) Resize(l int64) int { oldBuffer := ce.exemplars oldNextIndex := int64(ce.nextIndex) - ce.exemplars = make([]*circularBufferEntry, l) + ce.exemplars = make([]circularBufferEntry, l) ce.index = make(map[string]*indexEntry, l/estimatedExemplarsPerSeries) ce.nextIndex = 0 @@ -311,10 +310,11 @@ func (ce *CircularExemplarStorage) Resize(l int64) int { // This way we don't migrate exemplars that would just be overwritten when migrating later exemplars. startIndex := (oldNextIndex - count + int64(len(oldBuffer))) % int64(len(oldBuffer)) + var buf [1024]byte for i := int64(0); i < count; i++ { idx := (startIndex + i) % int64(len(oldBuffer)) - if entry := oldBuffer[idx]; entry != nil { - ce.migrate(entry) + if oldBuffer[idx].ref != nil { + ce.migrate(&oldBuffer[idx], buf[:]) migrated++ } } @@ -328,9 +328,8 @@ func (ce *CircularExemplarStorage) Resize(l int64) int { // migrate is like AddExemplar but reuses existing structs. Expected to be called in batch and requires // external lock and does not compute metrics. -func (ce *CircularExemplarStorage) migrate(entry *circularBufferEntry) { - var buf [1024]byte - seriesLabels := entry.ref.seriesLabels.Bytes(buf[:]) +func (ce *CircularExemplarStorage) migrate(entry *circularBufferEntry, buf []byte) { + seriesLabels := entry.ref.seriesLabels.Bytes(buf[:0]) idx, ok := ce.index[string(seriesLabels)] if !ok { @@ -344,7 +343,7 @@ func (ce *CircularExemplarStorage) migrate(entry *circularBufferEntry) { idx.newest = ce.nextIndex entry.next = noExemplar - ce.exemplars[ce.nextIndex] = entry + ce.exemplars[ce.nextIndex] = *entry ce.nextIndex = (ce.nextIndex + 1) % len(ce.exemplars) } @@ -362,7 +361,8 @@ func (ce *CircularExemplarStorage) AddExemplar(l labels.Labels, e exemplar.Exemp ce.lock.Lock() defer ce.lock.Unlock() - err := ce.validateExemplar(seriesLabels, e, true) + idx, ok := ce.index[string(seriesLabels)] + err := ce.validateExemplar(idx, e, true) if err != nil { if errors.Is(err, storage.ErrDuplicateExemplar) { // Duplicate exemplar, noop. @@ -371,25 +371,23 @@ func (ce *CircularExemplarStorage) AddExemplar(l labels.Labels, e exemplar.Exemp return err } - _, ok := ce.index[string(seriesLabels)] if !ok { - ce.index[string(seriesLabels)] = &indexEntry{oldest: ce.nextIndex, seriesLabels: l} + idx = &indexEntry{oldest: ce.nextIndex, seriesLabels: l} + ce.index[string(seriesLabels)] = idx } else { - ce.exemplars[ce.index[string(seriesLabels)].newest].next = ce.nextIndex + ce.exemplars[idx.newest].next = ce.nextIndex } - if prev := ce.exemplars[ce.nextIndex]; prev == nil { - ce.exemplars[ce.nextIndex] = &circularBufferEntry{} - } else { + if prev := &ce.exemplars[ce.nextIndex]; prev.ref != nil { // There exists an exemplar already on this ce.nextIndex entry, // drop it, to make place for others. - var buf [1024]byte - prevLabels := prev.ref.seriesLabels.Bytes(buf[:]) if prev.next == noExemplar { // Last item for this series, remove index entry. + var buf [1024]byte + prevLabels := prev.ref.seriesLabels.Bytes(buf[:]) delete(ce.index, string(prevLabels)) } else { - ce.index[string(prevLabels)].oldest = prev.next + prev.ref.oldest = prev.next } } @@ -397,8 +395,8 @@ func (ce *CircularExemplarStorage) AddExemplar(l labels.Labels, e exemplar.Exemp // since this is the first exemplar stored for this series. ce.exemplars[ce.nextIndex].next = noExemplar ce.exemplars[ce.nextIndex].exemplar = e - ce.exemplars[ce.nextIndex].ref = ce.index[string(seriesLabels)] - ce.index[string(seriesLabels)].newest = ce.nextIndex + ce.exemplars[ce.nextIndex].ref = idx + idx.newest = ce.nextIndex ce.nextIndex = (ce.nextIndex + 1) % len(ce.exemplars) @@ -416,15 +414,15 @@ func (ce *CircularExemplarStorage) computeMetrics() { return } - if next := ce.exemplars[ce.nextIndex]; next != nil { + if ce.exemplars[ce.nextIndex].ref != nil { ce.metrics.exemplarsInStorage.Set(float64(len(ce.exemplars))) - ce.metrics.lastExemplarsTs.Set(float64(next.exemplar.Ts) / 1000) + ce.metrics.lastExemplarsTs.Set(float64(ce.exemplars[ce.nextIndex].exemplar.Ts) / 1000) return } // We did not yet fill the buffer. ce.metrics.exemplarsInStorage.Set(float64(ce.nextIndex)) - if ce.exemplars[0] != nil { + if ce.exemplars[0].ref != nil { ce.metrics.lastExemplarsTs.Set(float64(ce.exemplars[0].exemplar.Ts) / 1000) } } @@ -438,7 +436,7 @@ func (ce *CircularExemplarStorage) IterateExemplars(f func(seriesLabels labels.L idx := ce.nextIndex l := len(ce.exemplars) for i := 0; i < l; i, idx = i+1, (idx+1)%l { - if ce.exemplars[idx] == nil { + if ce.exemplars[idx].ref == nil { continue } err := f(ce.exemplars[idx].ref.seriesLabels, ce.exemplars[idx].exemplar) diff --git a/tsdb/exemplar_test.go b/tsdb/exemplar_test.go index b2be22170..7723ec389 100644 --- a/tsdb/exemplar_test.go +++ b/tsdb/exemplar_test.go @@ -415,27 +415,29 @@ func BenchmarkAddExemplar(b *testing.B) { // before adding. exLabels := labels.FromStrings("trace_id", "89620921") - for _, n := range []int{10000, 100000, 1000000} { - b.Run(strconv.Itoa(n), func(b *testing.B) { - for j := 0; j < b.N; j++ { - b.StopTimer() - exs, err := NewCircularExemplarStorage(int64(n), eMetrics) - require.NoError(b, err) - es := exs.(*CircularExemplarStorage) - var l labels.Labels - b.StartTimer() - - for i := 0; i < n; i++ { - if i%100 == 0 { - l = labels.FromStrings("service", strconv.Itoa(i)) - } - err = es.AddExemplar(l, exemplar.Exemplar{Value: float64(i), Ts: int64(i), Labels: exLabels}) - if err != nil { - require.NoError(b, err) + for _, capacity := range []int{1000, 10000, 100000} { + for _, n := range []int{10000, 100000, 1000000} { + b.Run(fmt.Sprintf("%d/%d", n, capacity), func(b *testing.B) { + for j := 0; j < b.N; j++ { + b.StopTimer() + exs, err := NewCircularExemplarStorage(int64(capacity), eMetrics) + require.NoError(b, err) + es := exs.(*CircularExemplarStorage) + var l labels.Labels + b.StartTimer() + + for i := 0; i < n; i++ { + if i%100 == 0 { + l = labels.FromStrings("service", strconv.Itoa(i)) + } + err = es.AddExemplar(l, exemplar.Exemplar{Value: float64(i), Ts: int64(i), Labels: exLabels}) + if err != nil { + require.NoError(b, err) + } } } - } - }) + }) + } } } @@ -480,8 +482,11 @@ func BenchmarkResizeExemplars(b *testing.B) { require.NoError(b, err) es := exs.(*CircularExemplarStorage) + var l labels.Labels for i := 0; i < int(float64(tc.startSize)*float64(1.5)); i++ { - l := labels.FromStrings("service", strconv.Itoa(i)) + if i%100 == 0 { + l = labels.FromStrings("service", strconv.Itoa(i)) + } err = es.AddExemplar(l, exemplar.Exemplar{Value: float64(i), Ts: int64(i)}) if err != nil {