Browse Source

tsdb: use simpler map key to improve exemplar ingest performance (#10111)

* tsdb: fix exemplar benchmarks

Go benchmarks are expected to do an amount of work that varies with
the `b.N` parameter. Previously these benchmarks would report a result
like 0.01 ns/op, which is nonsense.

Signed-off-by: Bryan Boreham <bjboreham@gmail.com>

* tsdb: use simpler map key to improve exemplar perf

Prometheus holds an index of exemplars so it can discard the oldest one
for a series when a new one is added.
Since the keys are not for human eyes, we can use a simpler format
and save the effort of quoting label values.

Signed-off-by: Bryan Boreham <bjboreham@gmail.com>

* Exemplars: allocate index map with estimated size

This avoids Go having to re-size the map several times as it grows.
16 exemplars per series is a guess; if it is too low then the map will
be sparse, while if it is too high then the map will have to resize once
or twice.

Signed-off-by: Bryan Boreham <bjboreham@gmail.com>
pull/10128/head
Bryan Boreham 3 years ago committed by GitHub
parent
commit
82860a770c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 46
      tsdb/exemplar.go
  2. 73
      tsdb/exemplar_test.go

46
tsdb/exemplar.go

@ -27,8 +27,12 @@ import (
"github.com/prometheus/prometheus/storage"
)
// Indicates that there is no index entry for an exmplar.
const noExemplar = -1
const (
// Indicates that there is no index entry for an exmplar.
noExemplar = -1
// Estimated number of exemplars per series, for sizing the index.
estimatedExemplarsPerSeries = 16
)
type CircularExemplarStorage struct {
lock sync.RWMutex
@ -117,7 +121,7 @@ func NewCircularExemplarStorage(len int64, m *ExemplarMetrics) (ExemplarStorage,
}
c := &CircularExemplarStorage{
exemplars: make([]*circularBufferEntry, len),
index: make(map[string]*indexEntry),
index: make(map[string]*indexEntry, len/estimatedExemplarsPerSeries),
metrics: m,
}
@ -202,7 +206,8 @@ Outer:
}
func (ce *CircularExemplarStorage) ValidateExemplar(l labels.Labels, e exemplar.Exemplar) error {
seriesLabels := l.String()
var buf [1024]byte
seriesLabels := l.Bytes(buf[:])
// TODO(bwplotka): This lock can lock all scrapers, there might high contention on this on scale.
// Optimize by moving the lock to be per series (& benchmark it).
@ -213,7 +218,7 @@ func (ce *CircularExemplarStorage) ValidateExemplar(l labels.Labels, e exemplar.
// Not thread safe. The append parameters tells us whether this is an external validation, or internal
// as a result of an AddExemplar call, in which case we should update any relevant metrics.
func (ce *CircularExemplarStorage) validateExemplar(l string, e exemplar.Exemplar, append bool) error {
func (ce *CircularExemplarStorage) validateExemplar(key []byte, e exemplar.Exemplar, append bool) error {
if len(ce.exemplars) <= 0 {
return storage.ErrExemplarsDisabled
}
@ -230,7 +235,7 @@ func (ce *CircularExemplarStorage) validateExemplar(l string, e exemplar.Exempla
}
}
idx, ok := ce.index[l]
idx, ok := ce.index[string(key)]
if !ok {
return nil
}
@ -269,7 +274,7 @@ func (ce *CircularExemplarStorage) Resize(l int64) int {
oldNextIndex := int64(ce.nextIndex)
ce.exemplars = make([]*circularBufferEntry, l)
ce.index = make(map[string]*indexEntry)
ce.index = make(map[string]*indexEntry, l/estimatedExemplarsPerSeries)
ce.nextIndex = 0
// Replay as many entries as needed, starting with oldest first.
@ -305,13 +310,14 @@ func (ce *CircularExemplarStorage) Resize(l int64) int {
// migrate is like AddExemplar but reuses existing structs. Expected to be called in batch and requires
// external lock and does not compute metrics.
func (ce *CircularExemplarStorage) migrate(entry *circularBufferEntry) {
seriesLabels := entry.ref.seriesLabels.String()
var buf [1024]byte
seriesLabels := entry.ref.seriesLabels.Bytes(buf[:])
idx, ok := ce.index[seriesLabels]
idx, ok := ce.index[string(seriesLabels)]
if !ok {
idx = entry.ref
idx.oldest = ce.nextIndex
ce.index[seriesLabels] = idx
ce.index[string(seriesLabels)] = idx
} else {
entry.ref = idx
ce.exemplars[idx.newest].next = ce.nextIndex
@ -329,7 +335,8 @@ func (ce *CircularExemplarStorage) AddExemplar(l labels.Labels, e exemplar.Exemp
return storage.ErrExemplarsDisabled
}
seriesLabels := l.String()
var buf [1024]byte
seriesLabels := l.Bytes(buf[:])
// TODO(bwplotka): This lock can lock all scrapers, there might high contention on this on scale.
// Optimize by moving the lock to be per series (& benchmark it).
@ -345,11 +352,11 @@ func (ce *CircularExemplarStorage) AddExemplar(l labels.Labels, e exemplar.Exemp
return err
}
_, ok := ce.index[seriesLabels]
_, ok := ce.index[string(seriesLabels)]
if !ok {
ce.index[seriesLabels] = &indexEntry{oldest: ce.nextIndex, seriesLabels: l}
ce.index[string(seriesLabels)] = &indexEntry{oldest: ce.nextIndex, seriesLabels: l}
} else {
ce.exemplars[ce.index[seriesLabels].newest].next = ce.nextIndex
ce.exemplars[ce.index[string(seriesLabels)].newest].next = ce.nextIndex
}
if prev := ce.exemplars[ce.nextIndex]; prev == nil {
@ -357,12 +364,13 @@ func (ce *CircularExemplarStorage) AddExemplar(l labels.Labels, e exemplar.Exemp
} else {
// There exists exemplar already on this ce.nextIndex entry, drop it, to make place
// for others.
prevLabels := prev.ref.seriesLabels.String()
var buf [1024]byte
prevLabels := prev.ref.seriesLabels.Bytes(buf[:])
if prev.next == noExemplar {
// Last item for this series, remove index entry.
delete(ce.index, prevLabels)
delete(ce.index, string(prevLabels))
} else {
ce.index[prevLabels].oldest = prev.next
ce.index[string(prevLabels)].oldest = prev.next
}
}
@ -370,8 +378,8 @@ func (ce *CircularExemplarStorage) AddExemplar(l labels.Labels, e exemplar.Exemp
// since this is the first exemplar stored for this series.
ce.exemplars[ce.nextIndex].next = noExemplar
ce.exemplars[ce.nextIndex].exemplar = e
ce.exemplars[ce.nextIndex].ref = ce.index[seriesLabels]
ce.index[seriesLabels].newest = ce.nextIndex
ce.exemplars[ce.nextIndex].ref = ce.index[string(seriesLabels)]
ce.index[string(seriesLabels)].newest = ce.nextIndex
ce.nextIndex = (ce.nextIndex + 1) % len(ce.exemplars)

73
tsdb/exemplar_test.go

@ -112,7 +112,7 @@ func TestAddExemplar(t *testing.T) {
}
require.NoError(t, es.AddExemplar(l, e))
require.Equal(t, es.index[l.String()].newest, 0, "exemplar was not stored correctly")
require.Equal(t, es.index[string(l.Bytes(nil))].newest, 0, "exemplar was not stored correctly")
e2 := exemplar.Exemplar{
Labels: labels.Labels{
@ -126,8 +126,8 @@ func TestAddExemplar(t *testing.T) {
}
require.NoError(t, es.AddExemplar(l, e2))
require.Equal(t, es.index[l.String()].newest, 1, "exemplar was not stored correctly, location of newest exemplar for series in index did not update")
require.True(t, es.exemplars[es.index[l.String()].newest].exemplar.Equals(e2), "exemplar was not stored correctly, expected %+v got: %+v", e2, es.exemplars[es.index[l.String()].newest].exemplar)
require.Equal(t, es.index[string(l.Bytes(nil))].newest, 1, "exemplar was not stored correctly, location of newest exemplar for series in index did not update")
require.True(t, es.exemplars[es.index[string(l.Bytes(nil))].newest].exemplar.Equals(e2), "exemplar was not stored correctly, expected %+v got: %+v", e2, es.exemplars[es.index[string(l.Bytes(nil))].newest].exemplar)
require.NoError(t, es.AddExemplar(l, e2), "no error is expected attempting to add duplicate exemplar")
@ -300,7 +300,7 @@ func TestSelectExemplar_TimeRange(t *testing.T) {
Ts: int64(101 + i),
})
require.NoError(t, err)
require.Equal(t, es.index[l.String()].newest, i, "exemplar was not stored correctly")
require.Equal(t, es.index[string(l.Bytes(nil))].newest, i, "exemplar was not stored correctly")
}
m, err := labels.NewMatcher(labels.MatchEqual, l[0].Name, l[0].Value)
@ -376,14 +376,14 @@ func TestIndexOverwrite(t *testing.T) {
// Ensure index GC'ing is taking place, there should no longer be any
// index entry for series l1 since we just wrote two exemplars for series l2.
_, ok := es.index[l1.String()]
_, ok := es.index[string(l1.Bytes(nil))]
require.False(t, ok)
require.Equal(t, &indexEntry{1, 0, l2}, es.index[l2.String()])
require.Equal(t, &indexEntry{1, 0, l2}, es.index[string(l2.Bytes(nil))])
err = es.AddExemplar(l1, exemplar.Exemplar{Value: 4, Ts: 4})
require.NoError(t, err)
i := es.index[l2.String()]
i := es.index[string(l2.Bytes(nil))]
require.Equal(t, &indexEntry{0, 0, l2}, i)
}
@ -492,18 +492,23 @@ func BenchmarkAddExemplar(b *testing.B) {
for _, n := range []int{10000, 100000, 1000000} {
b.Run(fmt.Sprintf("%d", n), func(b *testing.B) {
exs, err := NewCircularExemplarStorage(int64(n), eMetrics)
require.NoError(b, err)
es := exs.(*CircularExemplarStorage)
b.ResetTimer()
l := labels.Labels{{Name: "service", Value: strconv.Itoa(0)}}
for i := 0; i < n; i++ {
if i%100 == 0 {
l = labels.Labels{{Name: "service", Value: strconv.Itoa(i)}}
}
err = es.AddExemplar(l, exemplar.Exemplar{Value: float64(i), Ts: int64(i), Labels: exLabels})
for j := 0; j < b.N; j++ {
b.StopTimer()
exs, err := NewCircularExemplarStorage(int64(n), eMetrics)
require.NoError(b, err)
es := exs.(*CircularExemplarStorage)
l := labels.Labels{{Name: "service", Value: strconv.Itoa(0)}}
b.StartTimer()
for i := 0; i < n; i++ {
if i%100 == 0 {
l = labels.Labels{{Name: "service", Value: strconv.Itoa(i)}}
}
err = es.AddExemplar(l, exemplar.Exemplar{Value: float64(i), Ts: int64(i), Labels: exLabels})
if err != nil {
require.NoError(b, err)
}
}
}
})
}
@ -543,24 +548,24 @@ func BenchmarkResizeExemplars(b *testing.B) {
}
for _, tc := range testCases {
exs, err := NewCircularExemplarStorage(tc.startSize, eMetrics)
require.NoError(b, err)
es := exs.(*CircularExemplarStorage)
b.Run(fmt.Sprintf("%s-%d-to-%d", tc.name, tc.startSize, tc.endSize), func(b *testing.B) {
for j := 0; j < b.N; j++ {
b.StopTimer()
exs, err := NewCircularExemplarStorage(tc.startSize, eMetrics)
require.NoError(b, err)
es := exs.(*CircularExemplarStorage)
for i := 0; i < int(float64(tc.startSize)*float64(1.5)); i++ {
l := labels.FromStrings("service", strconv.Itoa(i))
for i := 0; i < int(float64(tc.startSize)*float64(1.5)); i++ {
l := labels.FromStrings("service", strconv.Itoa(i))
err = es.AddExemplar(l, exemplar.Exemplar{Value: float64(i), Ts: int64(i)})
require.NoError(b, err)
}
saveIndex := es.index
saveExemplars := es.exemplars
b.Run(fmt.Sprintf("%s-%d-to-%d", tc.name, tc.startSize, tc.endSize), func(t *testing.B) {
es.index = saveIndex
es.exemplars = saveExemplars
b.ResetTimer()
es.Resize(tc.endSize)
err = es.AddExemplar(l, exemplar.Exemplar{Value: float64(i), Ts: int64(i)})
if err != nil {
require.NoError(b, err)
}
}
b.StartTimer()
es.Resize(tc.endSize)
}
})
}
}

Loading…
Cancel
Save