mirror of https://github.com/prometheus/prometheus
Merge pull request #11688 from damnever/fix/datamodelvalidation-remotewriteapi
Validate the metric names and labels in the remote write handlerpull/12512/head
commit
986fde06b2
|
@ -22,6 +22,8 @@ import (
|
||||||
"github.com/go-kit/log"
|
"github.com/go-kit/log"
|
||||||
"github.com/go-kit/log/level"
|
"github.com/go-kit/log/level"
|
||||||
|
|
||||||
|
"github.com/prometheus/client_golang/prometheus"
|
||||||
|
|
||||||
"github.com/prometheus/prometheus/model/exemplar"
|
"github.com/prometheus/prometheus/model/exemplar"
|
||||||
"github.com/prometheus/prometheus/prompb"
|
"github.com/prometheus/prometheus/prompb"
|
||||||
"github.com/prometheus/prometheus/storage"
|
"github.com/prometheus/prometheus/storage"
|
||||||
|
@ -30,15 +32,28 @@ import (
|
||||||
type writeHandler struct {
|
type writeHandler struct {
|
||||||
logger log.Logger
|
logger log.Logger
|
||||||
appendable storage.Appendable
|
appendable storage.Appendable
|
||||||
|
|
||||||
|
samplesWithInvalidLabelsTotal prometheus.Counter
|
||||||
}
|
}
|
||||||
|
|
||||||
// NewWriteHandler creates a http.Handler that accepts remote write requests and
|
// NewWriteHandler creates a http.Handler that accepts remote write requests and
|
||||||
// writes them to the provided appendable.
|
// writes them to the provided appendable.
|
||||||
func NewWriteHandler(logger log.Logger, appendable storage.Appendable) http.Handler {
|
func NewWriteHandler(logger log.Logger, reg prometheus.Registerer, appendable storage.Appendable) http.Handler {
|
||||||
return &writeHandler{
|
h := &writeHandler{
|
||||||
logger: logger,
|
logger: logger,
|
||||||
appendable: appendable,
|
appendable: appendable,
|
||||||
|
|
||||||
|
samplesWithInvalidLabelsTotal: prometheus.NewCounter(prometheus.CounterOpts{
|
||||||
|
Namespace: "prometheus",
|
||||||
|
Subsystem: "api",
|
||||||
|
Name: "remote_write_invalid_labels_samples_total",
|
||||||
|
Help: "The total number of remote write samples which contains invalid labels.",
|
||||||
|
}),
|
||||||
}
|
}
|
||||||
|
if reg != nil {
|
||||||
|
reg.MustRegister(h.samplesWithInvalidLabelsTotal)
|
||||||
|
}
|
||||||
|
return h
|
||||||
}
|
}
|
||||||
|
|
||||||
func (h *writeHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
|
func (h *writeHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
|
||||||
|
@ -85,6 +100,7 @@ func (h *writeHandler) checkAppendExemplarError(err error, e exemplar.Exemplar,
|
||||||
|
|
||||||
func (h *writeHandler) write(ctx context.Context, req *prompb.WriteRequest) (err error) {
|
func (h *writeHandler) write(ctx context.Context, req *prompb.WriteRequest) (err error) {
|
||||||
outOfOrderExemplarErrs := 0
|
outOfOrderExemplarErrs := 0
|
||||||
|
samplesWithInvalidLabels := 0
|
||||||
|
|
||||||
app := h.appendable.Appender(ctx)
|
app := h.appendable.Appender(ctx)
|
||||||
defer func() {
|
defer func() {
|
||||||
|
@ -98,6 +114,11 @@ func (h *writeHandler) write(ctx context.Context, req *prompb.WriteRequest) (err
|
||||||
var exemplarErr error
|
var exemplarErr error
|
||||||
for _, ts := range req.Timeseries {
|
for _, ts := range req.Timeseries {
|
||||||
labels := labelProtosToLabels(ts.Labels)
|
labels := labelProtosToLabels(ts.Labels)
|
||||||
|
if !labels.IsValid() {
|
||||||
|
level.Warn(h.logger).Log("msg", "Invalid metric names or labels", "got", labels.String())
|
||||||
|
samplesWithInvalidLabels++
|
||||||
|
continue
|
||||||
|
}
|
||||||
for _, s := range ts.Samples {
|
for _, s := range ts.Samples {
|
||||||
_, err = app.Append(0, labels, s.Timestamp, s.Value)
|
_, err = app.Append(0, labels, s.Timestamp, s.Value)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
@ -150,6 +171,9 @@ func (h *writeHandler) write(ctx context.Context, req *prompb.WriteRequest) (err
|
||||||
if outOfOrderExemplarErrs > 0 {
|
if outOfOrderExemplarErrs > 0 {
|
||||||
_ = level.Warn(h.logger).Log("msg", "Error on ingesting out-of-order exemplars", "num_dropped", outOfOrderExemplarErrs)
|
_ = level.Warn(h.logger).Log("msg", "Error on ingesting out-of-order exemplars", "num_dropped", outOfOrderExemplarErrs)
|
||||||
}
|
}
|
||||||
|
if samplesWithInvalidLabels > 0 {
|
||||||
|
h.samplesWithInvalidLabelsTotal.Add(float64(samplesWithInvalidLabels))
|
||||||
|
}
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
|
@ -20,6 +20,8 @@ import (
|
||||||
"io"
|
"io"
|
||||||
"net/http"
|
"net/http"
|
||||||
"net/http/httptest"
|
"net/http/httptest"
|
||||||
|
"strconv"
|
||||||
|
"strings"
|
||||||
"testing"
|
"testing"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
@ -43,7 +45,7 @@ func TestRemoteWriteHandler(t *testing.T) {
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
|
||||||
appendable := &mockAppendable{}
|
appendable := &mockAppendable{}
|
||||||
handler := NewWriteHandler(nil, appendable)
|
handler := NewWriteHandler(nil, nil, appendable)
|
||||||
|
|
||||||
recorder := httptest.NewRecorder()
|
recorder := httptest.NewRecorder()
|
||||||
handler.ServeHTTP(recorder, req)
|
handler.ServeHTTP(recorder, req)
|
||||||
|
@ -94,7 +96,7 @@ func TestOutOfOrderSample(t *testing.T) {
|
||||||
appendable := &mockAppendable{
|
appendable := &mockAppendable{
|
||||||
latestSample: 100,
|
latestSample: 100,
|
||||||
}
|
}
|
||||||
handler := NewWriteHandler(log.NewNopLogger(), appendable)
|
handler := NewWriteHandler(log.NewNopLogger(), nil, appendable)
|
||||||
|
|
||||||
recorder := httptest.NewRecorder()
|
recorder := httptest.NewRecorder()
|
||||||
handler.ServeHTTP(recorder, req)
|
handler.ServeHTTP(recorder, req)
|
||||||
|
@ -119,7 +121,7 @@ func TestOutOfOrderExemplar(t *testing.T) {
|
||||||
appendable := &mockAppendable{
|
appendable := &mockAppendable{
|
||||||
latestExemplar: 100,
|
latestExemplar: 100,
|
||||||
}
|
}
|
||||||
handler := NewWriteHandler(log.NewNopLogger(), appendable)
|
handler := NewWriteHandler(log.NewNopLogger(), nil, appendable)
|
||||||
|
|
||||||
recorder := httptest.NewRecorder()
|
recorder := httptest.NewRecorder()
|
||||||
handler.ServeHTTP(recorder, req)
|
handler.ServeHTTP(recorder, req)
|
||||||
|
@ -142,7 +144,7 @@ func TestOutOfOrderHistogram(t *testing.T) {
|
||||||
appendable := &mockAppendable{
|
appendable := &mockAppendable{
|
||||||
latestHistogram: 100,
|
latestHistogram: 100,
|
||||||
}
|
}
|
||||||
handler := NewWriteHandler(log.NewNopLogger(), appendable)
|
handler := NewWriteHandler(log.NewNopLogger(), nil, appendable)
|
||||||
|
|
||||||
recorder := httptest.NewRecorder()
|
recorder := httptest.NewRecorder()
|
||||||
handler.ServeHTTP(recorder, req)
|
handler.ServeHTTP(recorder, req)
|
||||||
|
@ -151,6 +153,34 @@ func TestOutOfOrderHistogram(t *testing.T) {
|
||||||
require.Equal(t, http.StatusBadRequest, resp.StatusCode)
|
require.Equal(t, http.StatusBadRequest, resp.StatusCode)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func BenchmarkRemoteWritehandler(b *testing.B) {
|
||||||
|
const labelValue = "abcdefg'hijlmn234!@#$%^&*()_+~`\"{}[],./<>?hello0123hiOlá你好Dzieńdobry9Zd8ra765v4stvuyte"
|
||||||
|
reqs := []*http.Request{}
|
||||||
|
for i := 0; i < b.N; i++ {
|
||||||
|
num := strings.Repeat(strconv.Itoa(i), 16)
|
||||||
|
buf, _, err := buildWriteRequest([]prompb.TimeSeries{{
|
||||||
|
Labels: []prompb.Label{
|
||||||
|
{Name: "__name__", Value: "test_metric"},
|
||||||
|
{Name: "test_label_name_" + num, Value: labelValue + num},
|
||||||
|
},
|
||||||
|
Histograms: []prompb.Histogram{HistogramToHistogramProto(0, &testHistogram)},
|
||||||
|
}}, nil, nil, nil)
|
||||||
|
require.NoError(b, err)
|
||||||
|
req, err := http.NewRequest("", "", bytes.NewReader(buf))
|
||||||
|
require.NoError(b, err)
|
||||||
|
reqs = append(reqs, req)
|
||||||
|
}
|
||||||
|
|
||||||
|
appendable := &mockAppendable{}
|
||||||
|
handler := NewWriteHandler(log.NewNopLogger(), nil, appendable)
|
||||||
|
recorder := httptest.NewRecorder()
|
||||||
|
|
||||||
|
b.ResetTimer()
|
||||||
|
for _, req := range reqs {
|
||||||
|
handler.ServeHTTP(recorder, req)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
func TestCommitErr(t *testing.T) {
|
func TestCommitErr(t *testing.T) {
|
||||||
buf, _, err := buildWriteRequest(writeRequestFixture.Timeseries, nil, nil, nil)
|
buf, _, err := buildWriteRequest(writeRequestFixture.Timeseries, nil, nil, nil)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
@ -161,7 +191,7 @@ func TestCommitErr(t *testing.T) {
|
||||||
appendable := &mockAppendable{
|
appendable := &mockAppendable{
|
||||||
commitErr: fmt.Errorf("commit error"),
|
commitErr: fmt.Errorf("commit error"),
|
||||||
}
|
}
|
||||||
handler := NewWriteHandler(log.NewNopLogger(), appendable)
|
handler := NewWriteHandler(log.NewNopLogger(), nil, appendable)
|
||||||
|
|
||||||
recorder := httptest.NewRecorder()
|
recorder := httptest.NewRecorder()
|
||||||
handler.ServeHTTP(recorder, req)
|
handler.ServeHTTP(recorder, req)
|
||||||
|
@ -187,7 +217,7 @@ func BenchmarkRemoteWriteOOOSamples(b *testing.B) {
|
||||||
require.NoError(b, db.Close())
|
require.NoError(b, db.Close())
|
||||||
})
|
})
|
||||||
|
|
||||||
handler := NewWriteHandler(log.NewNopLogger(), db.Head())
|
handler := NewWriteHandler(log.NewNopLogger(), nil, db.Head())
|
||||||
|
|
||||||
buf, _, err := buildWriteRequest(genSeriesWithSample(1000, 200*time.Minute.Milliseconds()), nil, nil, nil)
|
buf, _, err := buildWriteRequest(genSeriesWithSample(1000, 200*time.Minute.Milliseconds()), nil, nil, nil)
|
||||||
require.NoError(b, err)
|
require.NoError(b, err)
|
||||||
|
|
|
@ -285,7 +285,7 @@ func NewAPI(
|
||||||
}
|
}
|
||||||
|
|
||||||
if ap != nil {
|
if ap != nil {
|
||||||
a.remoteWriteHandler = remote.NewWriteHandler(logger, ap)
|
a.remoteWriteHandler = remote.NewWriteHandler(logger, registerer, ap)
|
||||||
}
|
}
|
||||||
|
|
||||||
return a
|
return a
|
||||||
|
|
Loading…
Reference in New Issue