diff --git a/tsdb/exemplar.go b/tsdb/exemplar.go index 516b538e1..3718d9591 100644 --- a/tsdb/exemplar.go +++ b/tsdb/exemplar.go @@ -27,8 +27,12 @@ import ( "github.com/prometheus/prometheus/storage" ) -// Indicates that there is no index entry for an exmplar. -const noExemplar = -1 +const ( + // Indicates that there is no index entry for an exmplar. + noExemplar = -1 + // Estimated number of exemplars per series, for sizing the index. + estimatedExemplarsPerSeries = 16 +) type CircularExemplarStorage struct { lock sync.RWMutex @@ -117,7 +121,7 @@ func NewCircularExemplarStorage(len int64, m *ExemplarMetrics) (ExemplarStorage, } c := &CircularExemplarStorage{ exemplars: make([]*circularBufferEntry, len), - index: make(map[string]*indexEntry), + index: make(map[string]*indexEntry, len/estimatedExemplarsPerSeries), metrics: m, } @@ -202,7 +206,8 @@ Outer: } func (ce *CircularExemplarStorage) ValidateExemplar(l labels.Labels, e exemplar.Exemplar) error { - seriesLabels := l.String() + var buf [1024]byte + seriesLabels := l.Bytes(buf[:]) // TODO(bwplotka): This lock can lock all scrapers, there might high contention on this on scale. // Optimize by moving the lock to be per series (& benchmark it). @@ -213,7 +218,7 @@ func (ce *CircularExemplarStorage) ValidateExemplar(l labels.Labels, e exemplar. // Not thread safe. The append parameters tells us whether this is an external validation, or internal // as a result of an AddExemplar call, in which case we should update any relevant metrics. -func (ce *CircularExemplarStorage) validateExemplar(l string, e exemplar.Exemplar, append bool) error { +func (ce *CircularExemplarStorage) validateExemplar(key []byte, e exemplar.Exemplar, append bool) error { if len(ce.exemplars) <= 0 { return storage.ErrExemplarsDisabled } @@ -230,7 +235,7 @@ func (ce *CircularExemplarStorage) validateExemplar(l string, e exemplar.Exempla } } - idx, ok := ce.index[l] + idx, ok := ce.index[string(key)] if !ok { return nil } @@ -269,7 +274,7 @@ func (ce *CircularExemplarStorage) Resize(l int64) int { oldNextIndex := int64(ce.nextIndex) ce.exemplars = make([]*circularBufferEntry, l) - ce.index = make(map[string]*indexEntry) + ce.index = make(map[string]*indexEntry, l/estimatedExemplarsPerSeries) ce.nextIndex = 0 // Replay as many entries as needed, starting with oldest first. @@ -305,13 +310,14 @@ func (ce *CircularExemplarStorage) Resize(l int64) int { // migrate is like AddExemplar but reuses existing structs. Expected to be called in batch and requires // external lock and does not compute metrics. func (ce *CircularExemplarStorage) migrate(entry *circularBufferEntry) { - seriesLabels := entry.ref.seriesLabels.String() + var buf [1024]byte + seriesLabels := entry.ref.seriesLabels.Bytes(buf[:]) - idx, ok := ce.index[seriesLabels] + idx, ok := ce.index[string(seriesLabels)] if !ok { idx = entry.ref idx.oldest = ce.nextIndex - ce.index[seriesLabels] = idx + ce.index[string(seriesLabels)] = idx } else { entry.ref = idx ce.exemplars[idx.newest].next = ce.nextIndex @@ -329,7 +335,8 @@ func (ce *CircularExemplarStorage) AddExemplar(l labels.Labels, e exemplar.Exemp return storage.ErrExemplarsDisabled } - seriesLabels := l.String() + var buf [1024]byte + seriesLabels := l.Bytes(buf[:]) // TODO(bwplotka): This lock can lock all scrapers, there might high contention on this on scale. // Optimize by moving the lock to be per series (& benchmark it). @@ -345,11 +352,11 @@ func (ce *CircularExemplarStorage) AddExemplar(l labels.Labels, e exemplar.Exemp return err } - _, ok := ce.index[seriesLabels] + _, ok := ce.index[string(seriesLabels)] if !ok { - ce.index[seriesLabels] = &indexEntry{oldest: ce.nextIndex, seriesLabels: l} + ce.index[string(seriesLabels)] = &indexEntry{oldest: ce.nextIndex, seriesLabels: l} } else { - ce.exemplars[ce.index[seriesLabels].newest].next = ce.nextIndex + ce.exemplars[ce.index[string(seriesLabels)].newest].next = ce.nextIndex } if prev := ce.exemplars[ce.nextIndex]; prev == nil { @@ -357,12 +364,13 @@ func (ce *CircularExemplarStorage) AddExemplar(l labels.Labels, e exemplar.Exemp } else { // There exists exemplar already on this ce.nextIndex entry, drop it, to make place // for others. - prevLabels := prev.ref.seriesLabels.String() + var buf [1024]byte + prevLabels := prev.ref.seriesLabels.Bytes(buf[:]) if prev.next == noExemplar { // Last item for this series, remove index entry. - delete(ce.index, prevLabels) + delete(ce.index, string(prevLabels)) } else { - ce.index[prevLabels].oldest = prev.next + ce.index[string(prevLabels)].oldest = prev.next } } @@ -370,8 +378,8 @@ func (ce *CircularExemplarStorage) AddExemplar(l labels.Labels, e exemplar.Exemp // since this is the first exemplar stored for this series. ce.exemplars[ce.nextIndex].next = noExemplar ce.exemplars[ce.nextIndex].exemplar = e - ce.exemplars[ce.nextIndex].ref = ce.index[seriesLabels] - ce.index[seriesLabels].newest = ce.nextIndex + ce.exemplars[ce.nextIndex].ref = ce.index[string(seriesLabels)] + ce.index[string(seriesLabels)].newest = ce.nextIndex ce.nextIndex = (ce.nextIndex + 1) % len(ce.exemplars) diff --git a/tsdb/exemplar_test.go b/tsdb/exemplar_test.go index 1418dcca9..30c497426 100644 --- a/tsdb/exemplar_test.go +++ b/tsdb/exemplar_test.go @@ -112,7 +112,7 @@ func TestAddExemplar(t *testing.T) { } require.NoError(t, es.AddExemplar(l, e)) - require.Equal(t, es.index[l.String()].newest, 0, "exemplar was not stored correctly") + require.Equal(t, es.index[string(l.Bytes(nil))].newest, 0, "exemplar was not stored correctly") e2 := exemplar.Exemplar{ Labels: labels.Labels{ @@ -126,8 +126,8 @@ func TestAddExemplar(t *testing.T) { } require.NoError(t, es.AddExemplar(l, e2)) - require.Equal(t, es.index[l.String()].newest, 1, "exemplar was not stored correctly, location of newest exemplar for series in index did not update") - require.True(t, es.exemplars[es.index[l.String()].newest].exemplar.Equals(e2), "exemplar was not stored correctly, expected %+v got: %+v", e2, es.exemplars[es.index[l.String()].newest].exemplar) + require.Equal(t, es.index[string(l.Bytes(nil))].newest, 1, "exemplar was not stored correctly, location of newest exemplar for series in index did not update") + require.True(t, es.exemplars[es.index[string(l.Bytes(nil))].newest].exemplar.Equals(e2), "exemplar was not stored correctly, expected %+v got: %+v", e2, es.exemplars[es.index[string(l.Bytes(nil))].newest].exemplar) require.NoError(t, es.AddExemplar(l, e2), "no error is expected attempting to add duplicate exemplar") @@ -300,7 +300,7 @@ func TestSelectExemplar_TimeRange(t *testing.T) { Ts: int64(101 + i), }) require.NoError(t, err) - require.Equal(t, es.index[l.String()].newest, i, "exemplar was not stored correctly") + require.Equal(t, es.index[string(l.Bytes(nil))].newest, i, "exemplar was not stored correctly") } m, err := labels.NewMatcher(labels.MatchEqual, l[0].Name, l[0].Value) @@ -376,14 +376,14 @@ func TestIndexOverwrite(t *testing.T) { // Ensure index GC'ing is taking place, there should no longer be any // index entry for series l1 since we just wrote two exemplars for series l2. - _, ok := es.index[l1.String()] + _, ok := es.index[string(l1.Bytes(nil))] require.False(t, ok) - require.Equal(t, &indexEntry{1, 0, l2}, es.index[l2.String()]) + require.Equal(t, &indexEntry{1, 0, l2}, es.index[string(l2.Bytes(nil))]) err = es.AddExemplar(l1, exemplar.Exemplar{Value: 4, Ts: 4}) require.NoError(t, err) - i := es.index[l2.String()] + i := es.index[string(l2.Bytes(nil))] require.Equal(t, &indexEntry{0, 0, l2}, i) } @@ -492,18 +492,23 @@ func BenchmarkAddExemplar(b *testing.B) { for _, n := range []int{10000, 100000, 1000000} { b.Run(fmt.Sprintf("%d", n), func(b *testing.B) { - exs, err := NewCircularExemplarStorage(int64(n), eMetrics) - require.NoError(b, err) - es := exs.(*CircularExemplarStorage) - - b.ResetTimer() - l := labels.Labels{{Name: "service", Value: strconv.Itoa(0)}} - for i := 0; i < n; i++ { - if i%100 == 0 { - l = labels.Labels{{Name: "service", Value: strconv.Itoa(i)}} - } - err = es.AddExemplar(l, exemplar.Exemplar{Value: float64(i), Ts: int64(i), Labels: exLabels}) + for j := 0; j < b.N; j++ { + b.StopTimer() + exs, err := NewCircularExemplarStorage(int64(n), eMetrics) require.NoError(b, err) + es := exs.(*CircularExemplarStorage) + l := labels.Labels{{Name: "service", Value: strconv.Itoa(0)}} + b.StartTimer() + + for i := 0; i < n; i++ { + if i%100 == 0 { + l = labels.Labels{{Name: "service", Value: strconv.Itoa(i)}} + } + err = es.AddExemplar(l, exemplar.Exemplar{Value: float64(i), Ts: int64(i), Labels: exLabels}) + if err != nil { + require.NoError(b, err) + } + } } }) } @@ -543,24 +548,24 @@ func BenchmarkResizeExemplars(b *testing.B) { } for _, tc := range testCases { - exs, err := NewCircularExemplarStorage(tc.startSize, eMetrics) - require.NoError(b, err) - es := exs.(*CircularExemplarStorage) + b.Run(fmt.Sprintf("%s-%d-to-%d", tc.name, tc.startSize, tc.endSize), func(b *testing.B) { + for j := 0; j < b.N; j++ { + b.StopTimer() + exs, err := NewCircularExemplarStorage(tc.startSize, eMetrics) + require.NoError(b, err) + es := exs.(*CircularExemplarStorage) - for i := 0; i < int(float64(tc.startSize)*float64(1.5)); i++ { - l := labels.FromStrings("service", strconv.Itoa(i)) + for i := 0; i < int(float64(tc.startSize)*float64(1.5)); i++ { + l := labels.FromStrings("service", strconv.Itoa(i)) - err = es.AddExemplar(l, exemplar.Exemplar{Value: float64(i), Ts: int64(i)}) - require.NoError(b, err) - } - saveIndex := es.index - saveExemplars := es.exemplars - - b.Run(fmt.Sprintf("%s-%d-to-%d", tc.name, tc.startSize, tc.endSize), func(t *testing.B) { - es.index = saveIndex - es.exemplars = saveExemplars - b.ResetTimer() - es.Resize(tc.endSize) + err = es.AddExemplar(l, exemplar.Exemplar{Value: float64(i), Ts: int64(i)}) + if err != nil { + require.NoError(b, err) + } + } + b.StartTimer() + es.Resize(tc.endSize) + } }) } }