Browse Source

Merge pull request #14080 from bboreham/faster-exemplars

[ENHANCEMENT] TSDB: Faster exemplars
pull/14083/head^2
Ganesh Vernekar 6 months ago committed by GitHub
parent
commit
e47474d755
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
  1. 56
      tsdb/exemplar.go
  2. 45
      tsdb/exemplar_test.go

56
tsdb/exemplar.go

@ -37,7 +37,7 @@ const (
type CircularExemplarStorage struct {
lock sync.RWMutex
exemplars []*circularBufferEntry
exemplars []circularBufferEntry
nextIndex int
metrics *ExemplarMetrics
@ -121,7 +121,7 @@ func NewCircularExemplarStorage(length int64, m *ExemplarMetrics) (ExemplarStora
length = 0
}
c := &CircularExemplarStorage{
exemplars: make([]*circularBufferEntry, length),
exemplars: make([]circularBufferEntry, length),
index: make(map[string]*indexEntry, length/estimatedExemplarsPerSeries),
metrics: m,
}
@ -214,12 +214,12 @@ func (ce *CircularExemplarStorage) ValidateExemplar(l labels.Labels, e exemplar.
// Optimize by moving the lock to be per series (& benchmark it).
ce.lock.RLock()
defer ce.lock.RUnlock()
return ce.validateExemplar(seriesLabels, e, false)
return ce.validateExemplar(ce.index[string(seriesLabels)], e, false)
}
// Not thread safe. The appended parameters tells us whether this is an external validation, or internal
// as a result of an AddExemplar call, in which case we should update any relevant metrics.
func (ce *CircularExemplarStorage) validateExemplar(key []byte, e exemplar.Exemplar, appended bool) error {
func (ce *CircularExemplarStorage) validateExemplar(idx *indexEntry, e exemplar.Exemplar, appended bool) error {
if len(ce.exemplars) == 0 {
return storage.ErrExemplarsDisabled
}
@ -239,8 +239,7 @@ func (ce *CircularExemplarStorage) validateExemplar(key []byte, e exemplar.Exemp
return err
}
idx, ok := ce.index[string(key)]
if !ok {
if idx == nil {
return nil
}
@ -292,7 +291,7 @@ func (ce *CircularExemplarStorage) Resize(l int64) int {
oldBuffer := ce.exemplars
oldNextIndex := int64(ce.nextIndex)
ce.exemplars = make([]*circularBufferEntry, l)
ce.exemplars = make([]circularBufferEntry, l)
ce.index = make(map[string]*indexEntry, l/estimatedExemplarsPerSeries)
ce.nextIndex = 0
@ -311,10 +310,11 @@ func (ce *CircularExemplarStorage) Resize(l int64) int {
// This way we don't migrate exemplars that would just be overwritten when migrating later exemplars.
startIndex := (oldNextIndex - count + int64(len(oldBuffer))) % int64(len(oldBuffer))
var buf [1024]byte
for i := int64(0); i < count; i++ {
idx := (startIndex + i) % int64(len(oldBuffer))
if entry := oldBuffer[idx]; entry != nil {
ce.migrate(entry)
if oldBuffer[idx].ref != nil {
ce.migrate(&oldBuffer[idx], buf[:])
migrated++
}
}
@ -328,9 +328,8 @@ func (ce *CircularExemplarStorage) Resize(l int64) int {
// migrate is like AddExemplar but reuses existing structs. Expected to be called in batch and requires
// external lock and does not compute metrics.
func (ce *CircularExemplarStorage) migrate(entry *circularBufferEntry) {
var buf [1024]byte
seriesLabels := entry.ref.seriesLabels.Bytes(buf[:])
func (ce *CircularExemplarStorage) migrate(entry *circularBufferEntry, buf []byte) {
seriesLabels := entry.ref.seriesLabels.Bytes(buf[:0])
idx, ok := ce.index[string(seriesLabels)]
if !ok {
@ -344,7 +343,7 @@ func (ce *CircularExemplarStorage) migrate(entry *circularBufferEntry) {
idx.newest = ce.nextIndex
entry.next = noExemplar
ce.exemplars[ce.nextIndex] = entry
ce.exemplars[ce.nextIndex] = *entry
ce.nextIndex = (ce.nextIndex + 1) % len(ce.exemplars)
}
@ -362,7 +361,8 @@ func (ce *CircularExemplarStorage) AddExemplar(l labels.Labels, e exemplar.Exemp
ce.lock.Lock()
defer ce.lock.Unlock()
err := ce.validateExemplar(seriesLabels, e, true)
idx, ok := ce.index[string(seriesLabels)]
err := ce.validateExemplar(idx, e, true)
if err != nil {
if errors.Is(err, storage.ErrDuplicateExemplar) {
// Duplicate exemplar, noop.
@ -371,25 +371,23 @@ func (ce *CircularExemplarStorage) AddExemplar(l labels.Labels, e exemplar.Exemp
return err
}
_, ok := ce.index[string(seriesLabels)]
if !ok {
ce.index[string(seriesLabels)] = &indexEntry{oldest: ce.nextIndex, seriesLabels: l}
idx = &indexEntry{oldest: ce.nextIndex, seriesLabels: l}
ce.index[string(seriesLabels)] = idx
} else {
ce.exemplars[ce.index[string(seriesLabels)].newest].next = ce.nextIndex
ce.exemplars[idx.newest].next = ce.nextIndex
}
if prev := ce.exemplars[ce.nextIndex]; prev == nil {
ce.exemplars[ce.nextIndex] = &circularBufferEntry{}
} else {
if prev := &ce.exemplars[ce.nextIndex]; prev.ref != nil {
// There exists an exemplar already on this ce.nextIndex entry,
// drop it, to make place for others.
var buf [1024]byte
prevLabels := prev.ref.seriesLabels.Bytes(buf[:])
if prev.next == noExemplar {
// Last item for this series, remove index entry.
var buf [1024]byte
prevLabels := prev.ref.seriesLabels.Bytes(buf[:])
delete(ce.index, string(prevLabels))
} else {
ce.index[string(prevLabels)].oldest = prev.next
prev.ref.oldest = prev.next
}
}
@ -397,8 +395,8 @@ func (ce *CircularExemplarStorage) AddExemplar(l labels.Labels, e exemplar.Exemp
// since this is the first exemplar stored for this series.
ce.exemplars[ce.nextIndex].next = noExemplar
ce.exemplars[ce.nextIndex].exemplar = e
ce.exemplars[ce.nextIndex].ref = ce.index[string(seriesLabels)]
ce.index[string(seriesLabels)].newest = ce.nextIndex
ce.exemplars[ce.nextIndex].ref = idx
idx.newest = ce.nextIndex
ce.nextIndex = (ce.nextIndex + 1) % len(ce.exemplars)
@ -416,15 +414,15 @@ func (ce *CircularExemplarStorage) computeMetrics() {
return
}
if next := ce.exemplars[ce.nextIndex]; next != nil {
if ce.exemplars[ce.nextIndex].ref != nil {
ce.metrics.exemplarsInStorage.Set(float64(len(ce.exemplars)))
ce.metrics.lastExemplarsTs.Set(float64(next.exemplar.Ts) / 1000)
ce.metrics.lastExemplarsTs.Set(float64(ce.exemplars[ce.nextIndex].exemplar.Ts) / 1000)
return
}
// We did not yet fill the buffer.
ce.metrics.exemplarsInStorage.Set(float64(ce.nextIndex))
if ce.exemplars[0] != nil {
if ce.exemplars[0].ref != nil {
ce.metrics.lastExemplarsTs.Set(float64(ce.exemplars[0].exemplar.Ts) / 1000)
}
}
@ -438,7 +436,7 @@ func (ce *CircularExemplarStorage) IterateExemplars(f func(seriesLabels labels.L
idx := ce.nextIndex
l := len(ce.exemplars)
for i := 0; i < l; i, idx = i+1, (idx+1)%l {
if ce.exemplars[idx] == nil {
if ce.exemplars[idx].ref == nil {
continue
}
err := f(ce.exemplars[idx].ref.seriesLabels, ce.exemplars[idx].exemplar)

45
tsdb/exemplar_test.go

@ -415,27 +415,29 @@ func BenchmarkAddExemplar(b *testing.B) {
// before adding.
exLabels := labels.FromStrings("trace_id", "89620921")
for _, n := range []int{10000, 100000, 1000000} {
b.Run(strconv.Itoa(n), func(b *testing.B) {
for j := 0; j < b.N; j++ {
b.StopTimer()
exs, err := NewCircularExemplarStorage(int64(n), eMetrics)
require.NoError(b, err)
es := exs.(*CircularExemplarStorage)
var l labels.Labels
b.StartTimer()
for i := 0; i < n; i++ {
if i%100 == 0 {
l = labels.FromStrings("service", strconv.Itoa(i))
}
err = es.AddExemplar(l, exemplar.Exemplar{Value: float64(i), Ts: int64(i), Labels: exLabels})
if err != nil {
require.NoError(b, err)
for _, capacity := range []int{1000, 10000, 100000} {
for _, n := range []int{10000, 100000, 1000000} {
b.Run(fmt.Sprintf("%d/%d", n, capacity), func(b *testing.B) {
for j := 0; j < b.N; j++ {
b.StopTimer()
exs, err := NewCircularExemplarStorage(int64(capacity), eMetrics)
require.NoError(b, err)
es := exs.(*CircularExemplarStorage)
var l labels.Labels
b.StartTimer()
for i := 0; i < n; i++ {
if i%100 == 0 {
l = labels.FromStrings("service", strconv.Itoa(i))
}
err = es.AddExemplar(l, exemplar.Exemplar{Value: float64(i), Ts: int64(i), Labels: exLabels})
if err != nil {
require.NoError(b, err)
}
}
}
}
})
})
}
}
}
@ -480,8 +482,11 @@ func BenchmarkResizeExemplars(b *testing.B) {
require.NoError(b, err)
es := exs.(*CircularExemplarStorage)
var l labels.Labels
for i := 0; i < int(float64(tc.startSize)*float64(1.5)); i++ {
l := labels.FromStrings("service", strconv.Itoa(i))
if i%100 == 0 {
l = labels.FromStrings("service", strconv.Itoa(i))
}
err = es.AddExemplar(l, exemplar.Exemplar{Value: float64(i), Ts: int64(i)})
if err != nil {

Loading…
Cancel
Save