tsdb/head: wlog exemplars after samples (#13113)

When samples are committed in the head, they are also written to the WAL.
The order of WAL records should be sample then exemplar, but this was
not the case for native histogram samples. This PR fixes that.

The problem with the wrong order is that remote write reads the WAL and
sends the recorded timeseries in the WAL order, which means exemplars
arrived before histogram samples. If the receiving side is Prometheus
TSDB and the series has not existed before then the exemplar does not
currently create the series. Which means the exemplar is rejected and lost.

Signed-off-by: György Krajcsovits <gyorgy.krajcsovits@grafana.com>
pull/13092/head^2
George Krajcsovits 1 year ago committed by GitHub
parent e250f09b5d
commit 39a35d92bc
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

@ -689,14 +689,6 @@ func (a *headAppender) log() error {
return errors.Wrap(err, "log samples") return errors.Wrap(err, "log samples")
} }
} }
if len(a.exemplars) > 0 {
rec = enc.Exemplars(exemplarsForEncoding(a.exemplars), buf)
buf = rec[:0]
if err := a.head.wal.Log(rec); err != nil {
return errors.Wrap(err, "log exemplars")
}
}
if len(a.histograms) > 0 { if len(a.histograms) > 0 {
rec = enc.HistogramSamples(a.histograms, buf) rec = enc.HistogramSamples(a.histograms, buf)
buf = rec[:0] buf = rec[:0]
@ -711,6 +703,18 @@ func (a *headAppender) log() error {
return errors.Wrap(err, "log float histograms") return errors.Wrap(err, "log float histograms")
} }
} }
// Exemplars should be logged after samples (float/native histogram/etc),
// otherwise it might happen that we send the exemplars in a remote write
// batch before the samples, which in turn means the exemplar is rejected
// for missing series, since series are created due to samples.
if len(a.exemplars) > 0 {
rec = enc.Exemplars(exemplarsForEncoding(a.exemplars), buf)
buf = rec[:0]
if err := a.head.wal.Log(rec); err != nil {
return errors.Wrap(err, "log exemplars")
}
}
return nil return nil
} }

@ -22,6 +22,7 @@ import (
"os" "os"
"path" "path"
"path/filepath" "path/filepath"
"reflect"
"sort" "sort"
"strconv" "strconv"
"strings" "strings"
@ -190,6 +191,10 @@ func readTestWAL(t testing.TB, dir string) (recs []interface{}) {
meta, err := dec.Metadata(rec, nil) meta, err := dec.Metadata(rec, nil)
require.NoError(t, err) require.NoError(t, err)
recs = append(recs, meta) recs = append(recs, meta)
case record.Exemplars:
exemplars, err := dec.Exemplars(rec, nil)
require.NoError(t, err)
recs = append(recs, exemplars)
default: default:
t.Fatalf("unknown record type") t.Fatalf("unknown record type")
} }
@ -5457,3 +5462,55 @@ func TestHeadDetectsDuplicateSampleAtSizeLimit(t *testing.T) {
require.Equal(t, numSamples/2, storedSampleCount) require.Equal(t, numSamples/2, storedSampleCount)
} }
func TestWALSampleAndExemplarOrder(t *testing.T) {
lbls := labels.FromStrings("foo", "bar")
testcases := map[string]struct {
appendF func(app storage.Appender, ts int64) (storage.SeriesRef, error)
expectedType reflect.Type
}{
"float sample": {
appendF: func(app storage.Appender, ts int64) (storage.SeriesRef, error) {
return app.Append(0, lbls, ts, 1.0)
},
expectedType: reflect.TypeOf([]record.RefSample{}),
},
"histogram sample": {
appendF: func(app storage.Appender, ts int64) (storage.SeriesRef, error) {
return app.AppendHistogram(0, lbls, ts, tsdbutil.GenerateTestHistogram(1), nil)
},
expectedType: reflect.TypeOf([]record.RefHistogramSample{}),
},
"float histogram sample": {
appendF: func(app storage.Appender, ts int64) (storage.SeriesRef, error) {
return app.AppendHistogram(0, lbls, ts, nil, tsdbutil.GenerateTestFloatHistogram(1))
},
expectedType: reflect.TypeOf([]record.RefFloatHistogramSample{}),
},
}
for testName, tc := range testcases {
t.Run(testName, func(t *testing.T) {
h, w := newTestHead(t, 1000, wlog.CompressionNone, false)
defer func() {
require.NoError(t, h.Close())
}()
app := h.Appender(context.Background())
ref, err := tc.appendF(app, 10)
require.NoError(t, err)
app.AppendExemplar(ref, lbls, exemplar.Exemplar{Value: 1.0, Ts: 5})
app.Commit()
recs := readTestWAL(t, w.Dir())
require.Len(t, recs, 3)
_, ok := recs[0].([]record.RefSeries)
require.True(t, ok, "expected first record to be a RefSeries")
actualType := reflect.TypeOf(recs[1])
require.Equal(t, tc.expectedType, actualType, "expected second record to be a %s", tc.expectedType)
_, ok = recs[2].([]record.RefExemplar)
require.True(t, ok, "expected third record to be a RefExemplar")
})
}
}

Loading…
Cancel
Save