From 136956cca40b7bf29dc303ad497d01c30534e373 Mon Sep 17 00:00:00 2001 From: Jesus Vazquez Date: Fri, 13 Jan 2023 14:30:50 +0100 Subject: [PATCH] Attempt to append ooo sample at the end first (#11615) This is an optimization on the existing append in OOOChunk. What we've been doing so far is find the place inside the out-of-order slice where the new sample should go in and then place it there and move any samples to the right if necessary. This is OK but requires a binary search every time the slice is bigger than 0. The optimization is opinionated and suggests that although out-of-order samples can be out-of-order amongst themselves they'll probably be in order thus we can probably optimistically append at the end and if not do the binary search. OOOChunks are capped to 30 samples by default so this is a small optimization but everything adds up, specially if you handle many active timeseries with out-of-order samples. Signed-off-by: Jesus Vazquez Signed-off-by: Jesus Vazquez Co-authored-by: Ganesh Vernekar --- storage/remote/write_handler_test.go | 61 ++++++++++++++++++++++++++++ tsdb/ooo_head.go | 10 +++++ 2 files changed, 71 insertions(+) diff --git a/storage/remote/write_handler_test.go b/storage/remote/write_handler_test.go index 8ca6af805..58c4439fa 100644 --- a/storage/remote/write_handler_test.go +++ b/storage/remote/write_handler_test.go @@ -21,6 +21,7 @@ import ( "net/http" "net/http/httptest" "testing" + "time" "github.com/go-kit/log" "github.com/stretchr/testify/require" @@ -31,6 +32,7 @@ import ( "github.com/prometheus/prometheus/model/metadata" "github.com/prometheus/prometheus/prompb" "github.com/prometheus/prometheus/storage" + "github.com/prometheus/prometheus/tsdb" ) func TestRemoteWriteHandler(t *testing.T) { @@ -171,6 +173,65 @@ func TestCommitErr(t *testing.T) { require.Equal(t, "commit error\n", string(body)) } +func BenchmarkRemoteWriteOOOSamples(b *testing.B) { + dir := b.TempDir() + + opts := tsdb.DefaultOptions() + opts.OutOfOrderCapMax = 30 + opts.OutOfOrderTimeWindow = 120 * time.Minute.Milliseconds() + + db, err := tsdb.Open(dir, nil, nil, opts, nil) + require.NoError(b, err) + + b.Cleanup(func() { + require.NoError(b, db.Close()) + }) + + handler := NewWriteHandler(log.NewNopLogger(), db.Head()) + + buf, _, err := buildWriteRequest(genSeriesWithSample(1000, 200*time.Minute.Milliseconds()), nil, nil, nil) + require.NoError(b, err) + + req, err := http.NewRequest("", "", bytes.NewReader(buf)) + require.NoError(b, err) + + recorder := httptest.NewRecorder() + handler.ServeHTTP(recorder, req) + require.Equal(b, http.StatusNoContent, recorder.Code) + require.Equal(b, db.Head().NumSeries(), uint64(1000)) + + var bufRequests [][]byte + for i := 0; i < 100; i++ { + buf, _, err = buildWriteRequest(genSeriesWithSample(1000, int64(80+i)*time.Minute.Milliseconds()), nil, nil, nil) + require.NoError(b, err) + bufRequests = append(bufRequests, buf) + } + + b.ResetTimer() + for i := 0; i < 100; i++ { + req, err = http.NewRequest("", "", bytes.NewReader(bufRequests[i])) + require.NoError(b, err) + + recorder = httptest.NewRecorder() + handler.ServeHTTP(recorder, req) + require.Equal(b, http.StatusNoContent, recorder.Code) + require.Equal(b, db.Head().NumSeries(), uint64(1000)) + } +} + +func genSeriesWithSample(numSeries int, ts int64) []prompb.TimeSeries { + var series []prompb.TimeSeries + for i := 0; i < numSeries; i++ { + s := prompb.TimeSeries{ + Labels: []prompb.Label{{Name: "__name__", Value: fmt.Sprintf("test_metric_%d", i)}}, + Samples: []prompb.Sample{{Value: float64(i), Timestamp: ts}}, + } + series = append(series, s) + } + + return series +} + type mockAppendable struct { latestSample int64 samples []mockSample diff --git a/tsdb/ooo_head.go b/tsdb/ooo_head.go index c246ff2e5..63d0b3712 100644 --- a/tsdb/ooo_head.go +++ b/tsdb/ooo_head.go @@ -36,6 +36,15 @@ func NewOOOChunk() *OOOChunk { // Insert inserts the sample such that order is maintained. // Returns false if insert was not possible due to the same timestamp already existing. func (o *OOOChunk) Insert(t int64, v float64) bool { + // Although out-of-order samples can be out-of-order amongst themselves, we + // are opinionated and expect them to be usually in-order meaning we could + // try to append at the end first if the new timestamp is higher than the + // last known timestamp. + if len(o.samples) == 0 || t > o.samples[len(o.samples)-1].t { + o.samples = append(o.samples, sample{t, v, nil, nil}) + return true + } + // Find index of sample we should replace. i := sort.Search(len(o.samples), func(i int) bool { return o.samples[i].t >= t }) @@ -45,6 +54,7 @@ func (o *OOOChunk) Insert(t int64, v float64) bool { return true } + // Duplicate sample for timestamp is not allowed. if o.samples[i].t == t { return false }