diff --git a/retrieval/ingester.go b/retrieval/ingester.go index 55e0f51ae..428c2833a 100644 --- a/retrieval/ingester.go +++ b/retrieval/ingester.go @@ -14,11 +14,15 @@ package retrieval import ( + "errors" + "github.com/prometheus/client_golang/extraction" clientmodel "github.com/prometheus/client_golang/model" ) +var errIngestChannelFull = errors.New("ingestion channel full") + // MergeLabelsIngester merges a labelset ontop of a given extraction result and // passes the result on to another ingester. Label collisions are avoided by // appending a label prefix to any newly merged colliding labels. @@ -42,8 +46,14 @@ func (i *MergeLabelsIngester) Ingest(samples clientmodel.Samples) error { // ChannelIngester feeds results into a channel without modifying them. type ChannelIngester chan<- clientmodel.Samples -// Ingest ingests the provided extraction result by sending it to i. +// Ingest ingests the provided extraction result by sending it to its channel. +// It returns an error if the channel is not ready to receive. This is important +// to fail fast and to not pile up ingestion requests in case of overload. func (i ChannelIngester) Ingest(s clientmodel.Samples) error { - i <- s - return nil + select { + case i <- s: + return nil + default: + return errIngestChannelFull + } } diff --git a/retrieval/target.go b/retrieval/target.go index 7c4ba49f0..ddd0a0eb4 100644 --- a/retrieval/target.go +++ b/retrieval/target.go @@ -272,18 +272,6 @@ func (t *target) RunScraper(ingester extraction.Ingester, interval time.Duration targetIntervalLength.WithLabelValues(interval.String()).Observe( float64(took) / float64(time.Second), // Sub-second precision. ) - // Throttle the scrape if it took longer than interval - by - // sleeping for the time it took longer. This will make the - // actual scrape interval increase as long as a scrape takes - // longer than the interval we are aiming for. - time.Sleep(took - interval) - // After the sleep, we should check again if we have been stopped. - select { - case <-t.scraperStopping: - return - default: - // Do nothing. - } t.scrape(ingester) } } diff --git a/retrieval/target_test.go b/retrieval/target_test.go index dee3bce31..cb84ef7e4 100644 --- a/retrieval/target_test.go +++ b/retrieval/target_test.go @@ -46,6 +46,32 @@ func TestTargetScrapeUpdatesState(t *testing.T) { } } +func TestTargetScrapeWithFullChannel(t *testing.T) { + server := httptest.NewServer( + http.HandlerFunc( + func(w http.ResponseWriter, r *http.Request) { + w.Header().Set("Content-Type", `text/plain; version=0.0.4`) + w.Write([]byte("test_metric{foo=\"bar\"} 123.456\n")) + }, + ), + ) + defer server.Close() + + testTarget := NewTarget( + server.URL, + 100*time.Millisecond, + clientmodel.LabelSet{"dings": "bums"}, + ).(*target) + + testTarget.scrape(ChannelIngester(make(chan clientmodel.Samples))) // Capacity 0. + if testTarget.state != Unreachable { + t.Errorf("Expected target state %v, actual: %v", Unreachable, testTarget.state) + } + if testTarget.lastError != errIngestChannelFull { + t.Errorf("Expected target error %q, actual: %q", errIngestChannelFull, testTarget.lastError) + } +} + func TestTargetRecordScrapeHealth(t *testing.T) { testTarget := target{ url: "http://example.url", @@ -96,12 +122,15 @@ func TestTargetRecordScrapeHealth(t *testing.T) { func TestTargetScrapeTimeout(t *testing.T) { signal := make(chan bool, 1) - server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { - <-signal - w.Header().Set("Content-Type", `application/json; schema="prometheus/telemetry"; version=0.0.2`) - w.Write([]byte(`[]`)) - })) - + server := httptest.NewServer( + http.HandlerFunc( + func(w http.ResponseWriter, r *http.Request) { + <-signal + w.Header().Set("Content-Type", `text/plain; version=0.0.4`) + w.Write([]byte{}) + }, + ), + ) defer server.Close() testTarget := NewTarget(server.URL, 10*time.Millisecond, clientmodel.LabelSet{}) @@ -137,10 +166,13 @@ func TestTargetScrapeTimeout(t *testing.T) { } func TestTargetScrape404(t *testing.T) { - server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { - w.WriteHeader(http.StatusNotFound) - })) - + server := httptest.NewServer( + http.HandlerFunc( + func(w http.ResponseWriter, r *http.Request) { + w.WriteHeader(http.StatusNotFound) + }, + ), + ) defer server.Close() testTarget := NewTarget(server.URL, 10*time.Millisecond, clientmodel.LabelSet{}) @@ -179,3 +211,29 @@ func TestTargetRunScraperScrapes(t *testing.T) { t.Errorf("Scrape occured after it was stopped.") } } + +func BenchmarkScrape(b *testing.B) { + server := httptest.NewServer( + http.HandlerFunc( + func(w http.ResponseWriter, r *http.Request) { + w.Header().Set("Content-Type", `text/plain; version=0.0.4`) + w.Write([]byte("test_metric{foo=\"bar\"} 123.456\n")) + }, + ), + ) + defer server.Close() + + testTarget := NewTarget( + server.URL, + 100*time.Millisecond, + clientmodel.LabelSet{"dings": "bums"}, + ) + ingester := nopIngester{} + + b.ResetTimer() + for i := 0; i < b.N; i++ { + if err := testTarget.(*target).scrape(ingester); err != nil { + b.Fatal(err) + } + } +}