Add sample_limit to scrape config.

This imposes a hard limit on the number of samples ingested from the
target. This is counted after metric relabelling, to allow dropping of
problemtic metrics.

This is intended as a very blunt tool to prevent overload due to
misbehaving targets that suddenly jump in sample count (e.g. adding
a label containing email addresses).

Add metric to track how often this happens.

Fixes #2137
pull/2288/head
Brian Brazil 8 years ago
parent c8de1484d5
commit 30448286c7

@ -497,6 +497,8 @@ type ScrapeConfig struct {
MetricsPath string `yaml:"metrics_path,omitempty"` MetricsPath string `yaml:"metrics_path,omitempty"`
// The URL scheme with which to fetch metrics from targets. // The URL scheme with which to fetch metrics from targets.
Scheme string `yaml:"scheme,omitempty"` Scheme string `yaml:"scheme,omitempty"`
// More than this many samples post metric-relabelling will cause the scrape to fail.
SampleLimit uint `yaml:"sample_limit,omitempty"`
// We cannot do proper Go type embedding below as the parser will then parse // We cannot do proper Go type embedding below as the parser will then parse
// values arbitrarily into the overflow maps of further-down types. // values arbitrarily into the overflow maps of further-down types.

@ -133,6 +133,7 @@ var expectedConf = &Config{
ScrapeInterval: model.Duration(50 * time.Second), ScrapeInterval: model.Duration(50 * time.Second),
ScrapeTimeout: model.Duration(5 * time.Second), ScrapeTimeout: model.Duration(5 * time.Second),
SampleLimit: 1000,
HTTPClientConfig: HTTPClientConfig{ HTTPClientConfig: HTTPClientConfig{
BasicAuth: &BasicAuth{ BasicAuth: &BasicAuth{

@ -70,6 +70,8 @@ scrape_configs:
scrape_interval: 50s scrape_interval: 50s
scrape_timeout: 5s scrape_timeout: 5s
sample_limit: 1000
metrics_path: /my_path metrics_path: /my_path
scheme: https scheme: https

@ -78,6 +78,12 @@ var (
}, },
[]string{"scrape_job"}, []string{"scrape_job"},
) )
targetScrapeSampleLimit = prometheus.NewCounter(
prometheus.CounterOpts{
Name: "prometheus_target_scrapes_exceeded_sample_limit_total",
Help: "Total number of scrapes that hit the sample limit and were rejected.",
},
)
) )
func init() { func init() {
@ -86,6 +92,7 @@ func init() {
prometheus.MustRegister(targetReloadIntervalLength) prometheus.MustRegister(targetReloadIntervalLength)
prometheus.MustRegister(targetSyncIntervalLength) prometheus.MustRegister(targetSyncIntervalLength)
prometheus.MustRegister(targetScrapePoolSyncsCounter) prometheus.MustRegister(targetScrapePoolSyncsCounter)
prometheus.MustRegister(targetScrapeSampleLimit)
} }
// scrapePool manages scrapes for sets of targets. // scrapePool manages scrapes for sets of targets.
@ -103,7 +110,7 @@ type scrapePool struct {
loops map[uint64]loop loops map[uint64]loop
// Constructor for new scrape loops. This is settable for testing convenience. // Constructor for new scrape loops. This is settable for testing convenience.
newLoop func(context.Context, scraper, storage.SampleAppender, func(storage.SampleAppender) storage.SampleAppender, storage.SampleAppender) loop newLoop func(context.Context, scraper, storage.SampleAppender, func(storage.SampleAppender) storage.SampleAppender, storage.SampleAppender, uint) loop
} }
func newScrapePool(ctx context.Context, cfg *config.ScrapeConfig, app storage.SampleAppender) *scrapePool { func newScrapePool(ctx context.Context, cfg *config.ScrapeConfig, app storage.SampleAppender) *scrapePool {
@ -172,7 +179,7 @@ func (sp *scrapePool) reload(cfg *config.ScrapeConfig) {
var ( var (
t = sp.targets[fp] t = sp.targets[fp]
s = &targetScraper{Target: t, client: sp.client} s = &targetScraper{Target: t, client: sp.client}
newLoop = sp.newLoop(sp.ctx, s, sp.appender, sp.sampleMutator(t), sp.reportAppender(t)) newLoop = sp.newLoop(sp.ctx, s, sp.appender, sp.sampleMutator(t), sp.reportAppender(t), sp.config.SampleLimit)
) )
wg.Add(1) wg.Add(1)
@ -233,7 +240,7 @@ func (sp *scrapePool) sync(targets []*Target) {
if _, ok := sp.targets[hash]; !ok { if _, ok := sp.targets[hash]; !ok {
s := &targetScraper{Target: t, client: sp.client} s := &targetScraper{Target: t, client: sp.client}
l := sp.newLoop(sp.ctx, s, sp.appender, sp.sampleMutator(t), sp.reportAppender(t)) l := sp.newLoop(sp.ctx, s, sp.appender, sp.sampleMutator(t), sp.reportAppender(t), sp.config.SampleLimit)
sp.targets[hash] = t sp.targets[hash] = t
sp.loops[hash] = l sp.loops[hash] = l
@ -373,18 +380,21 @@ type scrapeLoop struct {
mutator func(storage.SampleAppender) storage.SampleAppender mutator func(storage.SampleAppender) storage.SampleAppender
// For sending up and scrape_*. // For sending up and scrape_*.
reportAppender storage.SampleAppender reportAppender storage.SampleAppender
// Limit on number of samples that will be accepted.
sampleLimit uint
done chan struct{} done chan struct{}
ctx context.Context ctx context.Context
cancel func() cancel func()
} }
func newScrapeLoop(ctx context.Context, sc scraper, app storage.SampleAppender, mut func(storage.SampleAppender) storage.SampleAppender, reportApp storage.SampleAppender) loop { func newScrapeLoop(ctx context.Context, sc scraper, app storage.SampleAppender, mut func(storage.SampleAppender) storage.SampleAppender, reportApp storage.SampleAppender, sampleLimit uint) loop {
sl := &scrapeLoop{ sl := &scrapeLoop{
scraper: sc, scraper: sc,
appender: app, appender: app,
mutator: mut, mutator: mut,
reportAppender: reportApp, reportAppender: reportApp,
sampleLimit: sampleLimit,
done: make(chan struct{}), done: make(chan struct{}),
} }
sl.ctx, sl.cancel = context.WithCancel(ctx) sl.ctx, sl.cancel = context.WithCancel(ctx)
@ -460,8 +470,13 @@ func (sl *scrapeLoop) processScrapeResult(samples model.Samples, scrapeErr error
app.Append(sample) app.Append(sample)
} }
// Send samples to storage. if sl.sampleLimit > 0 && uint(len(buf.buffer)) > sl.sampleLimit {
sl.append(buf.buffer) scrapeErr = fmt.Errorf("%d samples exceeded limit of %d", len(buf.buffer), sl.sampleLimit)
targetScrapeSampleLimit.Inc()
} else {
// Send samples to storage.
sl.append(buf.buffer)
}
} }
sl.report(start, time.Since(start), len(samples), len(buf.buffer), scrapeErr) sl.report(start, time.Since(start), len(samples), len(buf.buffer), scrapeErr)

@ -139,7 +139,7 @@ func TestScrapePoolReload(t *testing.T) {
} }
// On starting to run, new loops created on reload check whether their preceding // On starting to run, new loops created on reload check whether their preceding
// equivalents have been stopped. // equivalents have been stopped.
newLoop := func(ctx context.Context, s scraper, app storage.SampleAppender, mut func(storage.SampleAppender) storage.SampleAppender, reportApp storage.SampleAppender) loop { newLoop := func(ctx context.Context, s scraper, app storage.SampleAppender, mut func(storage.SampleAppender) storage.SampleAppender, reportApp storage.SampleAppender, sampleLimit uint) loop {
l := &testLoop{} l := &testLoop{}
l.startFunc = func(interval, timeout time.Duration, errc chan<- error) { l.startFunc = func(interval, timeout time.Duration, errc chan<- error) {
if interval != 3*time.Second { if interval != 3*time.Second {
@ -312,14 +312,13 @@ func TestScrapeLoopSampleProcessing(t *testing.T) {
testCases := []struct { testCases := []struct {
scrapedSamples model.Samples scrapedSamples model.Samples
scrapeError error scrapeError error
metricRelabelConfigs []*config.RelabelConfig scrapeConfig config.ScrapeConfig
expectedReportedSamples model.Samples expectedReportedSamples model.Samples
expectedIngestedSamplesCount int expectedIngestedSamplesCount int
}{ }{
{ {
scrapedSamples: readSamples, scrapedSamples: readSamples,
scrapeError: nil, scrapeError: nil,
metricRelabelConfigs: []*config.RelabelConfig{},
expectedReportedSamples: model.Samples{ expectedReportedSamples: model.Samples{
{ {
Metric: model.Metric{"__name__": "up"}, Metric: model.Metric{"__name__": "up"},
@ -342,11 +341,45 @@ func TestScrapeLoopSampleProcessing(t *testing.T) {
{ {
scrapedSamples: readSamples, scrapedSamples: readSamples,
scrapeError: nil, scrapeError: nil,
metricRelabelConfigs: []*config.RelabelConfig{ scrapeConfig: config.ScrapeConfig{
MetricRelabelConfigs: []*config.RelabelConfig{
{
Action: config.RelabelDrop,
SourceLabels: model.LabelNames{"__name__"},
Regex: config.MustNewRegexp("a.*"),
},
},
},
expectedReportedSamples: model.Samples{
{ {
Action: config.RelabelDrop, Metric: model.Metric{"__name__": "up"},
SourceLabels: model.LabelNames{"__name__"}, Value: 1,
Regex: config.MustNewRegexp("a.*"), },
{
Metric: model.Metric{"__name__": "scrape_duration_seconds"},
},
{
Metric: model.Metric{"__name__": "scrape_samples_scraped"},
Value: 2,
},
{
Metric: model.Metric{"__name__": "scrape_samples_post_metric_relabeling"},
Value: 1,
},
},
expectedIngestedSamplesCount: 1,
},
{
scrapedSamples: readSamples,
scrapeError: nil,
scrapeConfig: config.ScrapeConfig{
SampleLimit: 1,
MetricRelabelConfigs: []*config.RelabelConfig{
{
Action: config.RelabelDrop,
SourceLabels: model.LabelNames{"__name__"},
Regex: config.MustNewRegexp("a.*"),
},
}, },
}, },
expectedReportedSamples: model.Samples{ expectedReportedSamples: model.Samples{
@ -369,9 +402,33 @@ func TestScrapeLoopSampleProcessing(t *testing.T) {
expectedIngestedSamplesCount: 1, expectedIngestedSamplesCount: 1,
}, },
{ {
scrapedSamples: model.Samples{}, scrapedSamples: readSamples,
scrapeError: fmt.Errorf("error"), scrapeError: nil,
metricRelabelConfigs: []*config.RelabelConfig{}, scrapeConfig: config.ScrapeConfig{
SampleLimit: 1,
},
expectedReportedSamples: model.Samples{
{
Metric: model.Metric{"__name__": "up"},
Value: 0,
},
{
Metric: model.Metric{"__name__": "scrape_duration_seconds"},
},
{
Metric: model.Metric{"__name__": "scrape_samples_scraped"},
Value: 2,
},
{
Metric: model.Metric{"__name__": "scrape_samples_post_metric_relabeling"},
Value: 2,
},
},
expectedIngestedSamplesCount: 0,
},
{
scrapedSamples: model.Samples{},
scrapeError: fmt.Errorf("error"),
expectedReportedSamples: model.Samples{ expectedReportedSamples: model.Samples{
{ {
Metric: model.Metric{"__name__": "up"}, Metric: model.Metric{"__name__": "up"},
@ -393,26 +450,22 @@ func TestScrapeLoopSampleProcessing(t *testing.T) {
}, },
} }
for _, test := range testCases { for i, test := range testCases {
ingestedSamples := &bufferAppender{buffer: model.Samples{}} ingestedSamples := &bufferAppender{buffer: model.Samples{}}
reportedSamples := &bufferAppender{buffer: model.Samples{}} reportedSamples := &bufferAppender{buffer: model.Samples{}}
target := newTestTarget("example.com:80", 10*time.Millisecond, nil) target := newTestTarget("example.com:80", 10*time.Millisecond, nil)
cfg := &config.ScrapeConfig{ sp := newScrapePool(context.Background(), &test.scrapeConfig, ingestedSamples)
MetricRelabelConfigs: test.metricRelabelConfigs,
}
sp := newScrapePool(context.Background(), cfg, ingestedSamples)
scraper := &testScraper{} scraper := &testScraper{}
sl := newScrapeLoop(context.Background(), scraper, ingestedSamples, sp.sampleMutator(target), reportedSamples).(*scrapeLoop) sl := newScrapeLoop(context.Background(), scraper, ingestedSamples, sp.sampleMutator(target), reportedSamples, test.scrapeConfig.SampleLimit).(*scrapeLoop)
sl.processScrapeResult(test.scrapedSamples, test.scrapeError, time.Unix(0, 0)) sl.processScrapeResult(test.scrapedSamples, test.scrapeError, time.Unix(0, 0))
// Ignore value of scrape_duration_seconds, as it's time dependant. // Ignore value of scrape_duration_seconds, as it's time dependant.
reportedSamples.buffer[1].Value = 0 reportedSamples.buffer[1].Value = 0
if !reflect.DeepEqual(reportedSamples.buffer, test.expectedReportedSamples) { if !reflect.DeepEqual(reportedSamples.buffer, test.expectedReportedSamples) {
t.Errorf("Reported samples did not match expected metrics") t.Errorf("Reported samples did not match expected metrics for case %d", i)
t.Errorf("Expected: %v", test.expectedReportedSamples) t.Errorf("Expected: %v", test.expectedReportedSamples)
t.Fatalf("Got: %v", reportedSamples.buffer) t.Fatalf("Got: %v", reportedSamples.buffer)
} }
@ -425,7 +478,7 @@ func TestScrapeLoopSampleProcessing(t *testing.T) {
func TestScrapeLoopStop(t *testing.T) { func TestScrapeLoopStop(t *testing.T) {
scraper := &testScraper{} scraper := &testScraper{}
sl := newScrapeLoop(context.Background(), scraper, nil, nil, nil) sl := newScrapeLoop(context.Background(), scraper, nil, nil, nil, 0)
// The scrape pool synchronizes on stopping scrape loops. However, new scrape // The scrape pool synchronizes on stopping scrape loops. However, new scrape
// loops are syarted asynchronously. Thus it's possible, that a loop is stopped // loops are syarted asynchronously. Thus it's possible, that a loop is stopped
@ -483,7 +536,7 @@ func TestScrapeLoopRun(t *testing.T) {
defer close(signal) defer close(signal)
ctx, cancel := context.WithCancel(context.Background()) ctx, cancel := context.WithCancel(context.Background())
sl := newScrapeLoop(ctx, scraper, app, mut, reportApp) sl := newScrapeLoop(ctx, scraper, app, mut, reportApp, 0)
// The loop must terminate during the initial offset if the context // The loop must terminate during the initial offset if the context
// is canceled. // is canceled.
@ -521,7 +574,7 @@ func TestScrapeLoopRun(t *testing.T) {
} }
ctx, cancel = context.WithCancel(context.Background()) ctx, cancel = context.WithCancel(context.Background())
sl = newScrapeLoop(ctx, scraper, app, mut, reportApp) sl = newScrapeLoop(ctx, scraper, app, mut, reportApp, 0)
go func() { go func() {
sl.run(time.Second, 100*time.Millisecond, errc) sl.run(time.Second, 100*time.Millisecond, errc)

Loading…
Cancel
Save