mirror of https://github.com/prometheus/prometheus
Merge pull request #9247 from prometheus/superq/scrape_timeout_feature
Add scrape_timeout_seconds metric (behind feature flag)pull/9295/head
commit
9de62707b3
|
@ -107,6 +107,7 @@ type flagConfig struct {
|
|||
outageTolerance model.Duration
|
||||
resendDelay model.Duration
|
||||
web web.Options
|
||||
scrape scrape.Options
|
||||
tsdb tsdbOptions
|
||||
lookbackDelta model.Duration
|
||||
webTimeout model.Duration
|
||||
|
@ -152,6 +153,9 @@ func (c *flagConfig) setFeatureListOptions(logger log.Logger) error {
|
|||
case "memory-snapshot-on-shutdown":
|
||||
c.tsdb.EnableMemorySnapshotOnShutdown = true
|
||||
level.Info(logger).Log("msg", "Experimental memory snapshot on shutdown enabled")
|
||||
case "extra-scrape-metrics":
|
||||
c.scrape.ExtraMetrics = true
|
||||
level.Info(logger).Log("msg", "Experimental additional scrape metrics")
|
||||
case "":
|
||||
continue
|
||||
default:
|
||||
|
@ -312,7 +316,7 @@ func main() {
|
|||
a.Flag("query.max-samples", "Maximum number of samples a single query can load into memory. Note that queries will fail if they try to load more samples than this into memory, so this also limits the number of samples a query can return.").
|
||||
Default("50000000").IntVar(&cfg.queryMaxSamples)
|
||||
|
||||
a.Flag("enable-feature", "Comma separated feature names to enable. Valid options: exemplar-storage, expand-external-labels, memory-snapshot-on-shutdown, promql-at-modifier, promql-negative-offset, remote-write-receiver. See https://prometheus.io/docs/prometheus/latest/feature_flags/ for more details.").
|
||||
a.Flag("enable-feature", "Comma separated feature names to enable. Valid options: exemplar-storage, expand-external-labels, memory-snapshot-on-shutdown, promql-at-modifier, promql-negative-offset, remote-write-receiver, extra-scrape-metrics. See https://prometheus.io/docs/prometheus/latest/feature_flags/ for more details.").
|
||||
Default("").StringsVar(&cfg.featureList)
|
||||
|
||||
promlogflag.AddFlags(a, &cfg.promlogConfig)
|
||||
|
@ -457,7 +461,7 @@ func main() {
|
|||
ctxNotify, cancelNotify = context.WithCancel(context.Background())
|
||||
discoveryManagerNotify = discovery.NewManager(ctxNotify, log.With(logger, "component", "discovery manager notify"), discovery.Name("notify"))
|
||||
|
||||
scrapeManager = scrape.NewManager(log.With(logger, "component", "scrape manager"), fanoutStorage)
|
||||
scrapeManager = scrape.NewManager(&cfg.scrape, log.With(logger, "component", "scrape manager"), fanoutStorage)
|
||||
|
||||
opts = promql.EngineOpts{
|
||||
Logger: log.With(logger, "component", "query engine"),
|
||||
|
|
|
@ -61,3 +61,11 @@ Exemplar storage is implemented as a fixed size circular buffer that stores exem
|
|||
This takes the snapshot of the chunks that are in memory along with the series information when shutting down and stores
|
||||
it on disk. This will reduce the startup time since the memory state can be restored with this snapshot and m-mapped
|
||||
chunks without the need of WAL replay.
|
||||
|
||||
## Extra Scrape Metrics
|
||||
|
||||
`--enable-feature=extra-scrape-metrics`
|
||||
|
||||
When enabled, for each instance scrape, Prometheus stores a sample in the following additional time series:
|
||||
|
||||
* `scrape_timeout_seconds`. The configured `scrape_timeout` for a target. This allows you to measure each target to find out how close they are to timing out with `scrape_duration_seconds / scrape_timeout_seconds`.
|
||||
|
|
|
@ -99,12 +99,16 @@ func (mc *MetadataMetricsCollector) Collect(ch chan<- prometheus.Metric) {
|
|||
}
|
||||
|
||||
// NewManager is the Manager constructor
|
||||
func NewManager(logger log.Logger, app storage.Appendable) *Manager {
|
||||
func NewManager(o *Options, logger log.Logger, app storage.Appendable) *Manager {
|
||||
if o == nil {
|
||||
o = &Options{}
|
||||
}
|
||||
if logger == nil {
|
||||
logger = log.NewNopLogger()
|
||||
}
|
||||
m := &Manager{
|
||||
append: app,
|
||||
opts: o,
|
||||
logger: logger,
|
||||
scrapeConfigs: make(map[string]*config.ScrapeConfig),
|
||||
scrapePools: make(map[string]*scrapePool),
|
||||
|
@ -116,9 +120,15 @@ func NewManager(logger log.Logger, app storage.Appendable) *Manager {
|
|||
return m
|
||||
}
|
||||
|
||||
// Options are the configuration parameters to the scrape manager.
|
||||
type Options struct {
|
||||
ExtraMetrics bool
|
||||
}
|
||||
|
||||
// Manager maintains a set of scrape pools and manages start/stop cycles
|
||||
// when receiving new target groups from the discovery manager.
|
||||
type Manager struct {
|
||||
opts *Options
|
||||
logger log.Logger
|
||||
append storage.Appendable
|
||||
graceShut chan struct{}
|
||||
|
@ -181,7 +191,7 @@ func (m *Manager) reload() {
|
|||
level.Error(m.logger).Log("msg", "error reloading target set", "err", "invalid config id:"+setName)
|
||||
continue
|
||||
}
|
||||
sp, err := newScrapePool(scrapeConfig, m.append, m.jitterSeed, log.With(m.logger, "scrape_pool", setName))
|
||||
sp, err := newScrapePool(scrapeConfig, m.append, m.jitterSeed, log.With(m.logger, "scrape_pool", setName), m.opts.ExtraMetrics)
|
||||
if err != nil {
|
||||
level.Error(m.logger).Log("msg", "error creating new scrape pool", "err", err, "scrape_pool", setName)
|
||||
continue
|
||||
|
|
|
@ -398,7 +398,8 @@ scrape_configs:
|
|||
ch = make(chan struct{}, 1)
|
||||
)
|
||||
|
||||
scrapeManager := NewManager(nil, nil)
|
||||
opts := Options{}
|
||||
scrapeManager := NewManager(&opts, nil, nil)
|
||||
newLoop := func(scrapeLoopOptions) loop {
|
||||
ch <- struct{}{}
|
||||
return noopLoop()
|
||||
|
@ -460,7 +461,8 @@ scrape_configs:
|
|||
}
|
||||
|
||||
func TestManagerTargetsUpdates(t *testing.T) {
|
||||
m := NewManager(nil, nil)
|
||||
opts := Options{}
|
||||
m := NewManager(&opts, nil, nil)
|
||||
|
||||
ts := make(chan map[string][]*targetgroup.Group)
|
||||
go m.Run(ts)
|
||||
|
@ -512,7 +514,8 @@ global:
|
|||
return cfg
|
||||
}
|
||||
|
||||
scrapeManager := NewManager(nil, nil)
|
||||
opts := Options{}
|
||||
scrapeManager := NewManager(&opts, nil, nil)
|
||||
|
||||
// Load the first config.
|
||||
cfg1 := getConfig("ha1")
|
||||
|
|
|
@ -263,7 +263,7 @@ const maxAheadTime = 10 * time.Minute
|
|||
|
||||
type labelsMutator func(labels.Labels) labels.Labels
|
||||
|
||||
func newScrapePool(cfg *config.ScrapeConfig, app storage.Appendable, jitterSeed uint64, logger log.Logger) (*scrapePool, error) {
|
||||
func newScrapePool(cfg *config.ScrapeConfig, app storage.Appendable, jitterSeed uint64, logger log.Logger, reportScrapeTimeout bool) (*scrapePool, error) {
|
||||
targetScrapePools.Inc()
|
||||
if logger == nil {
|
||||
logger = log.NewNopLogger()
|
||||
|
@ -311,6 +311,7 @@ func newScrapePool(cfg *config.ScrapeConfig, app storage.Appendable, jitterSeed
|
|||
opts.labelLimits,
|
||||
opts.interval,
|
||||
opts.timeout,
|
||||
reportScrapeTimeout,
|
||||
)
|
||||
}
|
||||
|
||||
|
@ -828,6 +829,8 @@ type scrapeLoop struct {
|
|||
stopped chan struct{}
|
||||
|
||||
disabledEndOfRunStalenessMarkers bool
|
||||
|
||||
reportScrapeTimeout bool
|
||||
}
|
||||
|
||||
// scrapeCache tracks mappings of exposed metric strings to label sets and
|
||||
|
@ -1087,6 +1090,7 @@ func newScrapeLoop(ctx context.Context,
|
|||
labelLimits *labelLimits,
|
||||
interval time.Duration,
|
||||
timeout time.Duration,
|
||||
reportScrapeTimeout bool,
|
||||
) *scrapeLoop {
|
||||
if l == nil {
|
||||
l = log.NewNopLogger()
|
||||
|
@ -1112,6 +1116,7 @@ func newScrapeLoop(ctx context.Context,
|
|||
labelLimits: labelLimits,
|
||||
interval: interval,
|
||||
timeout: timeout,
|
||||
reportScrapeTimeout: reportScrapeTimeout,
|
||||
}
|
||||
sl.ctx, sl.cancel = context.WithCancel(ctx)
|
||||
|
||||
|
@ -1216,7 +1221,7 @@ func (sl *scrapeLoop) scrapeAndReport(interval, timeout time.Duration, last, app
|
|||
}()
|
||||
|
||||
defer func() {
|
||||
if err = sl.report(app, appendTime, time.Since(start), total, added, seriesAdded, scrapeErr); err != nil {
|
||||
if err = sl.report(app, appendTime, timeout, time.Since(start), total, added, seriesAdded, scrapeErr); err != nil {
|
||||
level.Warn(sl.l).Log("msg", "Appending scrape report failed", "err", err)
|
||||
}
|
||||
}()
|
||||
|
@ -1604,9 +1609,10 @@ const (
|
|||
scrapeSamplesMetricName = "scrape_samples_scraped" + "\xff"
|
||||
samplesPostRelabelMetricName = "scrape_samples_post_metric_relabeling" + "\xff"
|
||||
scrapeSeriesAddedMetricName = "scrape_series_added" + "\xff"
|
||||
scrapeTimeoutMetricName = "scrape_timeout_seconds" + "\xff"
|
||||
)
|
||||
|
||||
func (sl *scrapeLoop) report(app storage.Appender, start time.Time, duration time.Duration, scraped, added, seriesAdded int, scrapeErr error) (err error) {
|
||||
func (sl *scrapeLoop) report(app storage.Appender, start time.Time, timeout, duration time.Duration, scraped, added, seriesAdded int, scrapeErr error) (err error) {
|
||||
sl.scraper.Report(start, duration, scrapeErr)
|
||||
|
||||
ts := timestamp.FromTime(start)
|
||||
|
@ -1631,6 +1637,11 @@ func (sl *scrapeLoop) report(app storage.Appender, start time.Time, duration tim
|
|||
if err = sl.addReportSample(app, scrapeSeriesAddedMetricName, ts, float64(seriesAdded)); err != nil {
|
||||
return
|
||||
}
|
||||
if sl.reportScrapeTimeout {
|
||||
if err = sl.addReportSample(app, scrapeTimeoutMetricName, ts, timeout.Seconds()); err != nil {
|
||||
return
|
||||
}
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
|
@ -1654,6 +1665,11 @@ func (sl *scrapeLoop) reportStale(app storage.Appender, start time.Time) (err er
|
|||
if err = sl.addReportSample(app, scrapeSeriesAddedMetricName, ts, stale); err != nil {
|
||||
return
|
||||
}
|
||||
if sl.reportScrapeTimeout {
|
||||
if err = sl.addReportSample(app, scrapeTimeoutMetricName, ts, stale); err != nil {
|
||||
return
|
||||
}
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
|
|
|
@ -57,7 +57,7 @@ func TestNewScrapePool(t *testing.T) {
|
|||
var (
|
||||
app = &nopAppendable{}
|
||||
cfg = &config.ScrapeConfig{}
|
||||
sp, _ = newScrapePool(cfg, app, 0, nil)
|
||||
sp, _ = newScrapePool(cfg, app, 0, nil, false)
|
||||
)
|
||||
|
||||
if a, ok := sp.appendable.(*nopAppendable); !ok || a != app {
|
||||
|
@ -92,7 +92,7 @@ func TestDroppedTargetsList(t *testing.T) {
|
|||
},
|
||||
},
|
||||
}
|
||||
sp, _ = newScrapePool(cfg, app, 0, nil)
|
||||
sp, _ = newScrapePool(cfg, app, 0, nil, false)
|
||||
expectedLabelSetString = "{__address__=\"127.0.0.1:9090\", __scrape_interval__=\"0s\", __scrape_timeout__=\"0s\", job=\"dropMe\"}"
|
||||
expectedLength = 1
|
||||
)
|
||||
|
@ -456,7 +456,7 @@ func TestScrapePoolTargetLimit(t *testing.T) {
|
|||
func TestScrapePoolAppender(t *testing.T) {
|
||||
cfg := &config.ScrapeConfig{}
|
||||
app := &nopAppendable{}
|
||||
sp, _ := newScrapePool(cfg, app, 0, nil)
|
||||
sp, _ := newScrapePool(cfg, app, 0, nil, false)
|
||||
|
||||
loop := sp.newLoop(scrapeLoopOptions{
|
||||
target: &Target{},
|
||||
|
@ -497,7 +497,7 @@ func TestScrapePoolRaces(t *testing.T) {
|
|||
newConfig := func() *config.ScrapeConfig {
|
||||
return &config.ScrapeConfig{ScrapeInterval: interval, ScrapeTimeout: timeout}
|
||||
}
|
||||
sp, _ := newScrapePool(newConfig(), &nopAppendable{}, 0, nil)
|
||||
sp, _ := newScrapePool(newConfig(), &nopAppendable{}, 0, nil, false)
|
||||
tgts := []*targetgroup.Group{
|
||||
{
|
||||
Targets: []model.LabelSet{
|
||||
|
@ -589,6 +589,7 @@ func TestScrapeLoopStopBeforeRun(t *testing.T) {
|
|||
nil,
|
||||
1,
|
||||
0,
|
||||
false,
|
||||
)
|
||||
|
||||
// The scrape pool synchronizes on stopping scrape loops. However, new scrape
|
||||
|
@ -656,6 +657,7 @@ func TestScrapeLoopStop(t *testing.T) {
|
|||
nil,
|
||||
10*time.Millisecond,
|
||||
time.Hour,
|
||||
false,
|
||||
)
|
||||
|
||||
// Terminate loop after 2 scrapes.
|
||||
|
@ -726,6 +728,7 @@ func TestScrapeLoopRun(t *testing.T) {
|
|||
nil,
|
||||
time.Second,
|
||||
time.Hour,
|
||||
false,
|
||||
)
|
||||
|
||||
// The loop must terminate during the initial offset if the context
|
||||
|
@ -776,6 +779,7 @@ func TestScrapeLoopRun(t *testing.T) {
|
|||
nil,
|
||||
time.Second,
|
||||
100*time.Millisecond,
|
||||
false,
|
||||
)
|
||||
|
||||
go func() {
|
||||
|
@ -830,6 +834,7 @@ func TestScrapeLoopForcedErr(t *testing.T) {
|
|||
nil,
|
||||
time.Second,
|
||||
time.Hour,
|
||||
false,
|
||||
)
|
||||
|
||||
forcedErr := fmt.Errorf("forced err")
|
||||
|
@ -883,6 +888,7 @@ func TestScrapeLoopMetadata(t *testing.T) {
|
|||
nil,
|
||||
0,
|
||||
0,
|
||||
false,
|
||||
)
|
||||
defer cancel()
|
||||
|
||||
|
@ -935,6 +941,7 @@ func TestScrapeLoopSeriesAdded(t *testing.T) {
|
|||
nil,
|
||||
0,
|
||||
0,
|
||||
false,
|
||||
)
|
||||
defer cancel()
|
||||
|
||||
|
@ -976,6 +983,7 @@ func TestScrapeLoopRunCreatesStaleMarkersOnFailedScrape(t *testing.T) {
|
|||
nil,
|
||||
10*time.Millisecond,
|
||||
time.Hour,
|
||||
false,
|
||||
)
|
||||
// Succeed once, several failures, then stop.
|
||||
numScrapes := 0
|
||||
|
@ -1033,6 +1041,7 @@ func TestScrapeLoopRunCreatesStaleMarkersOnParseFailure(t *testing.T) {
|
|||
nil,
|
||||
10*time.Millisecond,
|
||||
time.Hour,
|
||||
false,
|
||||
)
|
||||
|
||||
// Succeed once, several failures, then stop.
|
||||
|
@ -1094,6 +1103,7 @@ func TestScrapeLoopCache(t *testing.T) {
|
|||
nil,
|
||||
10*time.Millisecond,
|
||||
time.Hour,
|
||||
false,
|
||||
)
|
||||
|
||||
numScrapes := 0
|
||||
|
@ -1171,6 +1181,7 @@ func TestScrapeLoopCacheMemoryExhaustionProtection(t *testing.T) {
|
|||
nil,
|
||||
10*time.Millisecond,
|
||||
time.Hour,
|
||||
false,
|
||||
)
|
||||
|
||||
numScrapes := 0
|
||||
|
@ -1280,6 +1291,7 @@ func TestScrapeLoopAppend(t *testing.T) {
|
|||
nil,
|
||||
0,
|
||||
0,
|
||||
false,
|
||||
)
|
||||
|
||||
now := time.Now()
|
||||
|
@ -1324,6 +1336,7 @@ func TestScrapeLoopAppendCacheEntryButErrNotFound(t *testing.T) {
|
|||
nil,
|
||||
0,
|
||||
0,
|
||||
false,
|
||||
)
|
||||
|
||||
fakeRef := uint64(1)
|
||||
|
@ -1376,6 +1389,7 @@ func TestScrapeLoopAppendSampleLimit(t *testing.T) {
|
|||
nil,
|
||||
0,
|
||||
0,
|
||||
false,
|
||||
)
|
||||
|
||||
// Get the value of the Counter before performing the append.
|
||||
|
@ -1448,6 +1462,7 @@ func TestScrapeLoop_ChangingMetricString(t *testing.T) {
|
|||
nil,
|
||||
0,
|
||||
0,
|
||||
false,
|
||||
)
|
||||
|
||||
now := time.Now()
|
||||
|
@ -1491,6 +1506,7 @@ func TestScrapeLoopAppendStaleness(t *testing.T) {
|
|||
nil,
|
||||
0,
|
||||
0,
|
||||
false,
|
||||
)
|
||||
|
||||
now := time.Now()
|
||||
|
@ -1537,6 +1553,7 @@ func TestScrapeLoopAppendNoStalenessIfTimestamp(t *testing.T) {
|
|||
nil,
|
||||
0,
|
||||
0,
|
||||
false,
|
||||
)
|
||||
|
||||
now := time.Now()
|
||||
|
@ -1641,6 +1658,7 @@ metric_total{n="2"} 2 # {t="2"} 2.0 20000
|
|||
nil,
|
||||
0,
|
||||
0,
|
||||
false,
|
||||
)
|
||||
|
||||
now := time.Now()
|
||||
|
@ -1701,6 +1719,7 @@ func TestScrapeLoopAppendExemplarSeries(t *testing.T) {
|
|||
nil,
|
||||
0,
|
||||
0,
|
||||
false,
|
||||
)
|
||||
|
||||
now := time.Now()
|
||||
|
@ -1748,6 +1767,7 @@ func TestScrapeLoopRunReportsTargetDownOnScrapeError(t *testing.T) {
|
|||
nil,
|
||||
10*time.Millisecond,
|
||||
time.Hour,
|
||||
false,
|
||||
)
|
||||
|
||||
scraper.scrapeFunc = func(ctx context.Context, w io.Writer) error {
|
||||
|
@ -1779,6 +1799,7 @@ func TestScrapeLoopRunReportsTargetDownOnInvalidUTF8(t *testing.T) {
|
|||
nil,
|
||||
10*time.Millisecond,
|
||||
time.Hour,
|
||||
false,
|
||||
)
|
||||
|
||||
scraper.scrapeFunc = func(ctx context.Context, w io.Writer) error {
|
||||
|
@ -1823,6 +1844,7 @@ func TestScrapeLoopAppendGracefullyIfAmendOrOutOfOrderOrOutOfBounds(t *testing.T
|
|||
nil,
|
||||
0,
|
||||
0,
|
||||
false,
|
||||
)
|
||||
|
||||
now := time.Unix(1, 0)
|
||||
|
@ -1863,6 +1885,7 @@ func TestScrapeLoopOutOfBoundsTimeError(t *testing.T) {
|
|||
nil,
|
||||
0,
|
||||
0,
|
||||
false,
|
||||
)
|
||||
|
||||
now := time.Now().Add(20 * time.Minute)
|
||||
|
@ -2116,6 +2139,7 @@ func TestScrapeLoop_RespectTimestamps(t *testing.T) {
|
|||
nil,
|
||||
0,
|
||||
0,
|
||||
false,
|
||||
)
|
||||
|
||||
now := time.Now()
|
||||
|
@ -2152,6 +2176,7 @@ func TestScrapeLoop_DiscardTimestamps(t *testing.T) {
|
|||
nil,
|
||||
0,
|
||||
0,
|
||||
false,
|
||||
)
|
||||
|
||||
now := time.Now()
|
||||
|
@ -2187,6 +2212,7 @@ func TestScrapeLoopDiscardDuplicateLabels(t *testing.T) {
|
|||
nil,
|
||||
0,
|
||||
0,
|
||||
false,
|
||||
)
|
||||
defer cancel()
|
||||
|
||||
|
@ -2240,6 +2266,7 @@ func TestScrapeLoopDiscardUnnamedMetrics(t *testing.T) {
|
|||
nil,
|
||||
0,
|
||||
0,
|
||||
false,
|
||||
)
|
||||
defer cancel()
|
||||
|
||||
|
@ -2332,7 +2359,7 @@ func TestReuseScrapeCache(t *testing.T) {
|
|||
ScrapeInterval: model.Duration(5 * time.Second),
|
||||
MetricsPath: "/metrics",
|
||||
}
|
||||
sp, _ = newScrapePool(cfg, app, 0, nil)
|
||||
sp, _ = newScrapePool(cfg, app, 0, nil, false)
|
||||
t1 = &Target{
|
||||
discoveredLabels: labels.Labels{
|
||||
labels.Label{
|
||||
|
@ -2460,6 +2487,7 @@ func TestScrapeAddFast(t *testing.T) {
|
|||
nil,
|
||||
0,
|
||||
0,
|
||||
false,
|
||||
)
|
||||
defer cancel()
|
||||
|
||||
|
@ -2489,7 +2517,7 @@ func TestReuseCacheRace(t *testing.T) {
|
|||
ScrapeInterval: model.Duration(5 * time.Second),
|
||||
MetricsPath: "/metrics",
|
||||
}
|
||||
sp, _ = newScrapePool(cfg, app, 0, nil)
|
||||
sp, _ = newScrapePool(cfg, app, 0, nil, false)
|
||||
t1 = &Target{
|
||||
discoveredLabels: labels.Labels{
|
||||
labels.Label{
|
||||
|
@ -2546,6 +2574,7 @@ func TestScrapeReportSingleAppender(t *testing.T) {
|
|||
nil,
|
||||
10*time.Millisecond,
|
||||
time.Hour,
|
||||
false,
|
||||
)
|
||||
|
||||
numScrapes := 0
|
||||
|
@ -2677,6 +2706,7 @@ func TestScrapeLoopLabelLimit(t *testing.T) {
|
|||
&test.labelLimits,
|
||||
0,
|
||||
0,
|
||||
false,
|
||||
)
|
||||
|
||||
slApp := sl.appender(context.Background())
|
||||
|
@ -2715,7 +2745,7 @@ func TestTargetScrapeIntervalAndTimeoutRelabel(t *testing.T) {
|
|||
},
|
||||
},
|
||||
}
|
||||
sp, _ := newScrapePool(config, &nopAppendable{}, 0, nil)
|
||||
sp, _ := newScrapePool(config, &nopAppendable{}, 0, nil, false)
|
||||
tgts := []*targetgroup.Group{
|
||||
{
|
||||
Targets: []model.LabelSet{{model.AddressLabel: "127.0.0.1:9090"}},
|
||||
|
|
Loading…
Reference in New Issue