From 99355443c774b5e681e7d7f0cc1c213bfa55ce11 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jan-Otto=20Kr=C3=B6pke?= Date: Tue, 25 Jun 2024 13:25:39 +0200 Subject: [PATCH] remote write handler: reject samples with future timestamps (#14304) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * fix(remote_write): reject samples with future timestamps * increase check to +10 minutes to allow for clock drift --------- Signed-off-by: Jan-Otto Kröpke Signed-off-by: Jan-Otto Kröpke Signed-off-by: Jan-Otto Kröpke Co-authored-by: Bryan Boreham --- storage/remote/write_handler.go | 67 ++++++++++-- storage/remote/write_handler_test.go | 153 ++++++++++++++++++--------- 2 files changed, 164 insertions(+), 56 deletions(-) diff --git a/storage/remote/write_handler.go b/storage/remote/write_handler.go index e7515a42b..0832c65ab 100644 --- a/storage/remote/write_handler.go +++ b/storage/remote/write_handler.go @@ -18,6 +18,7 @@ import ( "errors" "fmt" "net/http" + "time" "github.com/go-kit/log" "github.com/go-kit/log/level" @@ -25,7 +26,9 @@ import ( "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/prometheus/model/exemplar" + "github.com/prometheus/prometheus/model/histogram" "github.com/prometheus/prometheus/model/labels" + "github.com/prometheus/prometheus/model/timestamp" "github.com/prometheus/prometheus/prompb" "github.com/prometheus/prometheus/storage" otlptranslator "github.com/prometheus/prometheus/storage/remote/otlptranslator/prometheusremotewrite" @@ -38,6 +41,8 @@ type writeHandler struct { samplesWithInvalidLabelsTotal prometheus.Counter } +const maxAheadTime = 10 * time.Minute + // NewWriteHandler creates a http.Handler that accepts remote write requests and // writes them to the provided appendable. func NewWriteHandler(logger log.Logger, reg prometheus.Registerer, appendable storage.Appendable) http.Handler { @@ -104,17 +109,22 @@ func (h *writeHandler) write(ctx context.Context, req *prompb.WriteRequest) (err outOfOrderExemplarErrs := 0 samplesWithInvalidLabels := 0 - app := h.appendable.Appender(ctx) + timeLimitApp := &timeLimitAppender{ + Appender: h.appendable.Appender(ctx), + maxTime: timestamp.FromTime(time.Now().Add(maxAheadTime)), + } + defer func() { if err != nil { - _ = app.Rollback() + _ = timeLimitApp.Rollback() return } - err = app.Commit() + err = timeLimitApp.Commit() }() b := labels.NewScratchBuilder(0) var exemplarErr error + for _, ts := range req.Timeseries { labels := LabelProtosToLabels(&b, ts.Labels) if !labels.IsValid() { @@ -124,7 +134,7 @@ func (h *writeHandler) write(ctx context.Context, req *prompb.WriteRequest) (err } var ref storage.SeriesRef for _, s := range ts.Samples { - ref, err = app.Append(ref, labels, s.Timestamp, s.Value) + ref, err = timeLimitApp.Append(ref, labels, s.Timestamp, s.Value) if err != nil { unwrappedErr := errors.Unwrap(err) if unwrappedErr == nil { @@ -140,7 +150,7 @@ func (h *writeHandler) write(ctx context.Context, req *prompb.WriteRequest) (err for _, ep := range ts.Exemplars { e := exemplarProtoToExemplar(&b, ep) - _, exemplarErr = app.AppendExemplar(0, labels, e) + _, exemplarErr = timeLimitApp.AppendExemplar(0, labels, e) exemplarErr = h.checkAppendExemplarError(exemplarErr, e, &outOfOrderExemplarErrs) if exemplarErr != nil { // Since exemplar storage is still experimental, we don't fail the request on ingestion errors. @@ -151,11 +161,12 @@ func (h *writeHandler) write(ctx context.Context, req *prompb.WriteRequest) (err for _, hp := range ts.Histograms { if hp.IsFloatHistogram() { fhs := FloatHistogramProtoToFloatHistogram(hp) - _, err = app.AppendHistogram(0, labels, hp.Timestamp, nil, fhs) + _, err = timeLimitApp.AppendHistogram(0, labels, hp.Timestamp, nil, fhs) } else { hs := HistogramProtoToHistogram(hp) - _, err = app.AppendHistogram(0, labels, hp.Timestamp, hs, nil) + _, err = timeLimitApp.AppendHistogram(0, labels, hp.Timestamp, hs, nil) } + if err != nil { unwrappedErr := errors.Unwrap(err) if unwrappedErr == nil { @@ -233,3 +244,45 @@ func (h *otlpWriteHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { w.WriteHeader(http.StatusOK) } + +type timeLimitAppender struct { + storage.Appender + + maxTime int64 +} + +func (app *timeLimitAppender) Append(ref storage.SeriesRef, lset labels.Labels, t int64, v float64) (storage.SeriesRef, error) { + if t > app.maxTime { + return 0, fmt.Errorf("%w: timestamp is too far in the future", storage.ErrOutOfBounds) + } + + ref, err := app.Appender.Append(ref, lset, t, v) + if err != nil { + return 0, err + } + return ref, nil +} + +func (app *timeLimitAppender) AppendHistogram(ref storage.SeriesRef, l labels.Labels, t int64, h *histogram.Histogram, fh *histogram.FloatHistogram) (storage.SeriesRef, error) { + if t > app.maxTime { + return 0, fmt.Errorf("%w: timestamp is too far in the future", storage.ErrOutOfBounds) + } + + ref, err := app.Appender.AppendHistogram(ref, l, t, h, fh) + if err != nil { + return 0, err + } + return ref, nil +} + +func (app *timeLimitAppender) AppendExemplar(ref storage.SeriesRef, l labels.Labels, e exemplar.Exemplar) (storage.SeriesRef, error) { + if e.Ts > app.maxTime { + return 0, fmt.Errorf("%w: timestamp is too far in the future", storage.ErrOutOfBounds) + } + + ref, err := app.Appender.AppendExemplar(ref, l, e) + if err != nil { + return 0, err + } + return ref, nil +} diff --git a/storage/remote/write_handler_test.go b/storage/remote/write_handler_test.go index 1715e92c2..30dc1b3d6 100644 --- a/storage/remote/write_handler_test.go +++ b/storage/remote/write_handler_test.go @@ -18,6 +18,7 @@ import ( "context" "fmt" "io" + "math" "net/http" "net/http/httptest" "strconv" @@ -87,73 +88,127 @@ func TestRemoteWriteHandler(t *testing.T) { } func TestOutOfOrderSample(t *testing.T) { - buf, _, _, err := buildWriteRequest(nil, []prompb.TimeSeries{{ - Labels: []prompb.Label{{Name: "__name__", Value: "test_metric"}}, - Samples: []prompb.Sample{{Value: 1, Timestamp: 0}}, - }}, nil, nil, nil, nil) - require.NoError(t, err) - - req, err := http.NewRequest("", "", bytes.NewReader(buf)) - require.NoError(t, err) - - appendable := &mockAppendable{ - latestSample: 100, + tests := []struct { + Name string + Timestamp int64 + }{ + { + Name: "historic", + Timestamp: 0, + }, + { + Name: "future", + Timestamp: math.MaxInt64, + }, } - handler := NewWriteHandler(log.NewNopLogger(), nil, appendable) - recorder := httptest.NewRecorder() - handler.ServeHTTP(recorder, req) + for _, tc := range tests { + t.Run(tc.Name, func(t *testing.T) { + buf, _, _, err := buildWriteRequest(nil, []prompb.TimeSeries{{ + Labels: []prompb.Label{{Name: "__name__", Value: "test_metric"}}, + Samples: []prompb.Sample{{Value: 1, Timestamp: tc.Timestamp}}, + }}, nil, nil, nil, nil) + require.NoError(t, err) - resp := recorder.Result() - require.Equal(t, http.StatusBadRequest, resp.StatusCode) + req, err := http.NewRequest("", "", bytes.NewReader(buf)) + require.NoError(t, err) + + appendable := &mockAppendable{ + latestSample: 100, + } + handler := NewWriteHandler(log.NewNopLogger(), nil, appendable) + + recorder := httptest.NewRecorder() + handler.ServeHTTP(recorder, req) + + resp := recorder.Result() + require.Equal(t, http.StatusBadRequest, resp.StatusCode) + }) + } } // This test case currently aims to verify that the WriteHandler endpoint // don't fail on ingestion errors since the exemplar storage is // still experimental. func TestOutOfOrderExemplar(t *testing.T) { - buf, _, _, err := buildWriteRequest(nil, []prompb.TimeSeries{{ - Labels: []prompb.Label{{Name: "__name__", Value: "test_metric"}}, - Exemplars: []prompb.Exemplar{{Labels: []prompb.Label{{Name: "foo", Value: "bar"}}, Value: 1, Timestamp: 0}}, - }}, nil, nil, nil, nil) - require.NoError(t, err) - - req, err := http.NewRequest("", "", bytes.NewReader(buf)) - require.NoError(t, err) - - appendable := &mockAppendable{ - latestExemplar: 100, + tests := []struct { + Name string + Timestamp int64 + }{ + { + Name: "historic", + Timestamp: 0, + }, + { + Name: "future", + Timestamp: math.MaxInt64, + }, } - handler := NewWriteHandler(log.NewNopLogger(), nil, appendable) - recorder := httptest.NewRecorder() - handler.ServeHTTP(recorder, req) + for _, tc := range tests { + t.Run(tc.Name, func(t *testing.T) { + buf, _, _, err := buildWriteRequest(nil, []prompb.TimeSeries{{ + Labels: []prompb.Label{{Name: "__name__", Value: "test_metric"}}, + Exemplars: []prompb.Exemplar{{Labels: []prompb.Label{{Name: "foo", Value: "bar"}}, Value: 1, Timestamp: tc.Timestamp}}, + }}, nil, nil, nil, nil) + require.NoError(t, err) - resp := recorder.Result() - // TODO: update to require.Equal(t, http.StatusConflict, resp.StatusCode) once exemplar storage is not experimental. - require.Equal(t, http.StatusNoContent, resp.StatusCode) + req, err := http.NewRequest("", "", bytes.NewReader(buf)) + require.NoError(t, err) + + appendable := &mockAppendable{ + latestExemplar: 100, + } + handler := NewWriteHandler(log.NewNopLogger(), nil, appendable) + + recorder := httptest.NewRecorder() + handler.ServeHTTP(recorder, req) + + resp := recorder.Result() + // TODO: update to require.Equal(t, http.StatusConflict, resp.StatusCode) once exemplar storage is not experimental. + require.Equal(t, http.StatusNoContent, resp.StatusCode) + }) + } } func TestOutOfOrderHistogram(t *testing.T) { - buf, _, _, err := buildWriteRequest(nil, []prompb.TimeSeries{{ - Labels: []prompb.Label{{Name: "__name__", Value: "test_metric"}}, - Histograms: []prompb.Histogram{HistogramToHistogramProto(0, &testHistogram), FloatHistogramToHistogramProto(1, testHistogram.ToFloat(nil))}, - }}, nil, nil, nil, nil) - require.NoError(t, err) - - req, err := http.NewRequest("", "", bytes.NewReader(buf)) - require.NoError(t, err) - - appendable := &mockAppendable{ - latestHistogram: 100, + tests := []struct { + Name string + Timestamp int64 + }{ + { + Name: "historic", + Timestamp: 0, + }, + { + Name: "future", + Timestamp: math.MaxInt64, + }, } - handler := NewWriteHandler(log.NewNopLogger(), nil, appendable) - recorder := httptest.NewRecorder() - handler.ServeHTTP(recorder, req) + for _, tc := range tests { + t.Run(tc.Name, func(t *testing.T) { + buf, _, _, err := buildWriteRequest(nil, []prompb.TimeSeries{{ + Labels: []prompb.Label{{Name: "__name__", Value: "test_metric"}}, + Histograms: []prompb.Histogram{HistogramToHistogramProto(tc.Timestamp, &testHistogram), FloatHistogramToHistogramProto(1, testHistogram.ToFloat(nil))}, + }}, nil, nil, nil, nil) + require.NoError(t, err) - resp := recorder.Result() - require.Equal(t, http.StatusBadRequest, resp.StatusCode) + req, err := http.NewRequest("", "", bytes.NewReader(buf)) + require.NoError(t, err) + + appendable := &mockAppendable{ + latestHistogram: 100, + } + handler := NewWriteHandler(log.NewNopLogger(), nil, appendable) + + recorder := httptest.NewRecorder() + handler.ServeHTTP(recorder, req) + + resp := recorder.Result() + require.Equal(t, http.StatusBadRequest, resp.StatusCode) + }) + } } func BenchmarkRemoteWritehandler(b *testing.B) {