tsdb/agent: Ignore duplicate exemplars (#10595)

* tsdb/agent: Ignore duplicate exemplars

Signed-off-by: Paschalis Tsilias <paschalist0@gmail.com>

* Make each exemplar unique in TestCommit

Signed-off-by: Paschalis Tsilias <paschalist0@gmail.com>

* Re-Trigger CI for Windows and UI-related steps

Signed-off-by: Paschalis Tsilias <paschalist0@gmail.com>

* Change test comment to properly re-trigger pipeline

Signed-off-by: Paschalis Tsilias <paschalist0@gmail.com>

* Defer Close() calls for test agent and segment reader

Signed-off-by: Paschalis Tsilias <paschalist0@gmail.com>
pull/10609/head
Paschalis Tsilias 3 years ago committed by GitHub
parent 4deb1a90d2
commit 40c1efe8bc
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

@ -440,6 +440,8 @@ func (db *DB) loadWAL(r *wal.Reader, multiRef map[chunks.HeadSeriesRef]chunks.He
decoded <- samples decoded <- samples
case record.Tombstones, record.Exemplars: case record.Tombstones, record.Exemplars:
// We don't care about tombstones or exemplars during replay. // We don't care about tombstones or exemplars during replay.
// TODO: If decide to decode exemplars, we should make sure to prepopulate
// stripeSeries.exemplars in the next block by using setLatestExemplar.
continue continue
default: default:
errCh <- &wal.CorruptionErr{ errCh <- &wal.CorruptionErr{
@ -789,6 +791,16 @@ func (a *appender) AppendExemplar(ref storage.SeriesRef, l labels.Labels, e exem
} }
} }
// Check for duplicate vs last stored exemplar for this series, and discard those.
// Otherwise, record the current exemplar as the latest.
// Prometheus' TSDB returns 0 when encountering duplicates, so we do the same here.
prevExemplar := a.series.GetLatestExemplar(s.ref)
if prevExemplar != nil && prevExemplar.Equals(e) {
// Duplicate, don't return an error but don't accept the exemplar.
return 0, nil
}
a.series.SetLatestExemplar(s.ref, &e)
a.pendingExamplars = append(a.pendingExamplars, record.RefExemplar{ a.pendingExamplars = append(a.pendingExamplars, record.RefExemplar{
Ref: s.ref, Ref: s.ref,
T: e.Ts, T: e.Ts,
@ -796,6 +808,7 @@ func (a *appender) AppendExemplar(ref storage.SeriesRef, l labels.Labels, e exem
Labels: e.Labels, Labels: e.Labels,
}) })
a.metrics.totalAppendedExemplars.Inc()
return storage.SeriesRef(s.ref), nil return storage.SeriesRef(s.ref), nil
} }

@ -129,7 +129,7 @@ func TestCommit(t *testing.T) {
e := exemplar.Exemplar{ e := exemplar.Exemplar{
Labels: lset, Labels: lset,
Ts: sample[0].T(), Ts: sample[0].T() + int64(i),
Value: sample[0].V(), Value: sample[0].V(),
HasTs: true, HasTs: true,
} }
@ -482,3 +482,56 @@ func gatherFamily(t *testing.T, reg prometheus.Gatherer, familyName string) *dto
return nil return nil
} }
func TestStorage_DuplicateExemplarsIgnored(t *testing.T) {
s := createTestAgentDB(t, nil, DefaultOptions())
app := s.Appender(context.Background())
defer s.Close()
sRef, err := app.Append(0, labels.Labels{{Name: "a", Value: "1"}}, 0, 0)
require.NoError(t, err, "should not reject valid series")
// Write a few exemplars to our appender and call Commit().
// If the Labels, Value or Timestamp are different than the last exemplar,
// then a new one should be appended; Otherwise, it should be skipped.
e := exemplar.Exemplar{Labels: labels.Labels{{Name: "a", Value: "1"}}, Value: 20, Ts: 10, HasTs: true}
_, _ = app.AppendExemplar(sRef, nil, e)
_, _ = app.AppendExemplar(sRef, nil, e)
e.Labels = labels.Labels{{Name: "b", Value: "2"}}
_, _ = app.AppendExemplar(sRef, nil, e)
_, _ = app.AppendExemplar(sRef, nil, e)
_, _ = app.AppendExemplar(sRef, nil, e)
e.Value = 42
_, _ = app.AppendExemplar(sRef, nil, e)
_, _ = app.AppendExemplar(sRef, nil, e)
e.Ts = 25
_, _ = app.AppendExemplar(sRef, nil, e)
_, _ = app.AppendExemplar(sRef, nil, e)
require.NoError(t, app.Commit())
// Read back what was written to the WAL.
var walExemplarsCount int
sr, err := wal.NewSegmentsReader(s.wal.Dir())
require.NoError(t, err)
defer sr.Close()
r := wal.NewReader(sr)
var dec record.Decoder
for r.Next() {
rec := r.Record()
switch dec.Type(rec) {
case record.Exemplars:
var exemplars []record.RefExemplar
exemplars, err = dec.Exemplars(rec, exemplars)
require.NoError(t, err)
walExemplarsCount += len(exemplars)
}
}
// We had 9 calls to AppendExemplar but only 4 of those should have gotten through.
require.Equal(t, 4, walExemplarsCount)
}

@ -16,6 +16,7 @@ package agent
import ( import (
"sync" "sync"
"github.com/prometheus/prometheus/model/exemplar"
"github.com/prometheus/prometheus/model/labels" "github.com/prometheus/prometheus/model/labels"
"github.com/prometheus/prometheus/tsdb/chunks" "github.com/prometheus/prometheus/tsdb/chunks"
) )
@ -89,10 +90,11 @@ func (m seriesHashmap) Delete(hash uint64, ref chunks.HeadSeriesRef) {
// Filling the padded space with the maps was profiled to be slower - // Filling the padded space with the maps was profiled to be slower -
// likely due to the additional pointer dereferences. // likely due to the additional pointer dereferences.
type stripeSeries struct { type stripeSeries struct {
size int size int
series []map[chunks.HeadSeriesRef]*memSeries series []map[chunks.HeadSeriesRef]*memSeries
hashes []seriesHashmap hashes []seriesHashmap
locks []stripeLock exemplars []map[chunks.HeadSeriesRef]*exemplar.Exemplar
locks []stripeLock
gcMut sync.Mutex gcMut sync.Mutex
} }
@ -105,10 +107,11 @@ type stripeLock struct {
func newStripeSeries(stripeSize int) *stripeSeries { func newStripeSeries(stripeSize int) *stripeSeries {
s := &stripeSeries{ s := &stripeSeries{
size: stripeSize, size: stripeSize,
series: make([]map[chunks.HeadSeriesRef]*memSeries, stripeSize), series: make([]map[chunks.HeadSeriesRef]*memSeries, stripeSize),
hashes: make([]seriesHashmap, stripeSize), hashes: make([]seriesHashmap, stripeSize),
locks: make([]stripeLock, stripeSize), exemplars: make([]map[chunks.HeadSeriesRef]*exemplar.Exemplar, stripeSize),
locks: make([]stripeLock, stripeSize),
} }
for i := range s.series { for i := range s.series {
s.series[i] = map[chunks.HeadSeriesRef]*memSeries{} s.series[i] = map[chunks.HeadSeriesRef]*memSeries{}
@ -116,6 +119,9 @@ func newStripeSeries(stripeSize int) *stripeSeries {
for i := range s.hashes { for i := range s.hashes {
s.hashes[i] = seriesHashmap{} s.hashes[i] = seriesHashmap{}
} }
for i := range s.exemplars {
s.exemplars[i] = map[chunks.HeadSeriesRef]*exemplar.Exemplar{}
}
return s return s
} }
@ -154,6 +160,10 @@ func (s *stripeSeries) GC(mint int64) map[chunks.HeadSeriesRef]struct{} {
delete(s.series[refLock], series.ref) delete(s.series[refLock], series.ref)
s.hashes[hashLock].Delete(hash, series.ref) s.hashes[hashLock].Delete(hash, series.ref)
// Since the series is gone, we'll also delete
// the latest stored exemplar.
delete(s.exemplars[refLock], series.ref)
if hashLock != refLock { if hashLock != refLock {
s.locks[refLock].Unlock() s.locks[refLock].Unlock()
} }
@ -201,3 +211,24 @@ func (s *stripeSeries) Set(hash uint64, series *memSeries) {
s.hashes[hashLock].Set(hash, series) s.hashes[hashLock].Set(hash, series)
s.locks[hashLock].Unlock() s.locks[hashLock].Unlock()
} }
func (s *stripeSeries) GetLatestExemplar(ref chunks.HeadSeriesRef) *exemplar.Exemplar {
i := uint64(ref) & uint64(s.size-1)
s.locks[i].RLock()
exemplar := s.exemplars[i][ref]
s.locks[i].RUnlock()
return exemplar
}
func (s *stripeSeries) SetLatestExemplar(ref chunks.HeadSeriesRef, exemplar *exemplar.Exemplar) {
i := uint64(ref) & uint64(s.size-1)
// Make sure that's a valid series id and record its latest exemplar
s.locks[i].Lock()
if s.series[i][ref] != nil {
s.exemplars[i][ref] = exemplar
}
s.locks[i].Unlock()
}

Loading…
Cancel
Save