tsdb: don't allow ingesting empty labelsets (#6891)

* tsdb: don't allow ingesting empty labelsets

When we ingest an empty labelset in the head, further blocks can not be
compacted, with the error:

```
level=error ts=2020-02-27T21:26:58.379Z caller=db.go:659 component=tsdb
msg="compaction failed" err="persist head block: write compaction:
add series: out-of-order series added with label set \"{}\" / prev:
\"{}\""
```

We should therefore reject those invalid empty labelsets upfront.

This can be reproduced with the following:

```
cat << END > prometheus.yml
scrape_configs:
  - job_name: 'prometheus'
    scrape_interval: 1s
    basic_auth:
      username: test
      password: test
    metric_relabel_configs:
    - regex: ".*"
      action: labeldrop

    static_configs:
    - targets:
      - 127.0.1.1:9090
END
./prometheus --storage.tsdb.min-block-duration=1m
```
And wait a few minutes.

Signed-off-by: Julien Pivotto <roidelapluie@inuits.eu>
pull/6921/head
Julien Pivotto 5 years ago committed by GitHub
parent 2051ae2e6a
commit ed623f69e2
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

@ -498,6 +498,10 @@ func TestOpenMetricsParseErrors(t *testing.T) {
input: `custom_metric_total 1 # {aa=\"\xff\"} 9.0`,
err: "expected label value, got \"INVALID\"",
},
{
input: `{b="c",} 1`,
err: `"INVALID" "{" is not a valid start token`,
},
}
for i, c := range cases {

@ -265,6 +265,10 @@ func TestPromParseErrors(t *testing.T) {
input: "foo 0 1_2\n",
err: "expected next entry after timestamp, got \"MNAME\"",
},
{
input: `{a="ok"} 1`,
err: `"INVALID" is not a valid start token`,
},
}
for i, c := range cases {

@ -47,6 +47,8 @@ import (
"github.com/prometheus/prometheus/storage"
)
var errNameLabelMandatory = fmt.Errorf("missing metric name (%s label)", labels.MetricName)
var (
targetIntervalLength = prometheus.NewSummaryVec(
prometheus.SummaryOpts{
@ -1154,6 +1156,11 @@ loop:
continue
}
if !lset.Has(labels.MetricName) {
err = errNameLabelMandatory
break loop
}
var ref uint64
ref, err = app.Add(lset, t, v)
switch err {

@ -1587,6 +1587,41 @@ func TestScrapeLoopDiscardDuplicateLabels(t *testing.T) {
testutil.Equals(t, false, series.Next(), "more than one series found in tsdb")
}
func TestScrapeLoopDiscardUnnamedMetrics(t *testing.T) {
s := teststorage.New(t)
defer s.Close()
app := s.Appender()
ctx, cancel := context.WithCancel(context.Background())
sl := newScrapeLoop(ctx,
&testScraper{},
nil, nil,
func(l labels.Labels) labels.Labels {
if l.Has("drop") {
return labels.Labels{}
}
return l
},
nopMutator,
func() storage.Appender { return app },
nil,
0,
true,
)
defer cancel()
_, _, _, err := sl.append([]byte("nok 1\nnok2{drop=\"drop\"} 1\n"), "", time.Time{})
testutil.NotOk(t, err)
testutil.Equals(t, errNameLabelMandatory, err)
q, err := s.Querier(ctx, time.Time{}.UnixNano(), 0)
testutil.Ok(t, err)
series, _, err := q.Select(&storage.SelectParams{}, labels.MustNewMatcher(labels.MatchRegexp, "__name__", ".*"))
testutil.Ok(t, err)
testutil.Equals(t, false, series.Next(), "series found in tsdb")
}
func TestReusableConfig(t *testing.T) {
variants := []*config.ScrapeConfig{
&config.ScrapeConfig{

@ -357,12 +357,12 @@ func TestAmendDatapointCausesError(t *testing.T) {
}()
app := db.Appender()
_, err := app.Add(labels.Labels{}, 0, 0)
_, err := app.Add(labels.Labels{{Name: "a", Value: "b"}}, 0, 0)
testutil.Ok(t, err)
testutil.Ok(t, app.Commit())
app = db.Appender()
_, err = app.Add(labels.Labels{}, 0, 1)
_, err = app.Add(labels.Labels{{Name: "a", Value: "b"}}, 0, 1)
testutil.Equals(t, ErrAmendSample, err)
testutil.Ok(t, app.Rollback())
}
@ -375,12 +375,12 @@ func TestDuplicateNaNDatapointNoAmendError(t *testing.T) {
}()
app := db.Appender()
_, err := app.Add(labels.Labels{}, 0, math.NaN())
_, err := app.Add(labels.Labels{{Name: "a", Value: "b"}}, 0, math.NaN())
testutil.Ok(t, err)
testutil.Ok(t, app.Commit())
app = db.Appender()
_, err = app.Add(labels.Labels{}, 0, math.NaN())
_, err = app.Add(labels.Labels{{Name: "a", Value: "b"}}, 0, math.NaN())
testutil.Ok(t, err)
}
@ -392,15 +392,28 @@ func TestNonDuplicateNaNDatapointsCausesAmendError(t *testing.T) {
}()
app := db.Appender()
_, err := app.Add(labels.Labels{}, 0, math.Float64frombits(0x7ff0000000000001))
_, err := app.Add(labels.Labels{{Name: "a", Value: "b"}}, 0, math.Float64frombits(0x7ff0000000000001))
testutil.Ok(t, err)
testutil.Ok(t, app.Commit())
app = db.Appender()
_, err = app.Add(labels.Labels{}, 0, math.Float64frombits(0x7ff0000000000002))
_, err = app.Add(labels.Labels{{Name: "a", Value: "b"}}, 0, math.Float64frombits(0x7ff0000000000002))
testutil.Equals(t, ErrAmendSample, err)
}
func TestEmptyLabelsetCausesError(t *testing.T) {
db, closeFn := openTestDB(t, nil, nil)
defer func() {
testutil.Ok(t, db.Close())
closeFn()
}()
app := db.Appender()
_, err := app.Add(labels.Labels{}, 0, 0)
testutil.NotOk(t, err)
testutil.Equals(t, "empty labelset: invalid sample", err.Error())
}
func TestSkippingInvalidValuesInSameTxn(t *testing.T) {
db, closeFn := openTestDB(t, nil, nil)
defer func() {

@ -957,6 +957,10 @@ func (a *headAppender) Add(lset labels.Labels, t int64, v float64) (uint64, erro
// Ensure no empty labels have gotten through.
lset = lset.WithoutEmpty()
if len(lset) == 0 {
return 0, errors.Wrap(ErrInvalidSample, "empty labelset")
}
if l, dup := lset.HasDuplicateLabelNames(); dup {
return 0, errors.Wrap(ErrInvalidSample, fmt.Sprintf(`label name "%s" is not unique`, l))
}

Loading…
Cancel
Save