|
|
|
@ -158,7 +158,15 @@ type scrapePool struct {
|
|
|
|
|
cancel context.CancelFunc |
|
|
|
|
|
|
|
|
|
// Constructor for new scrape loops. This is settable for testing convenience.
|
|
|
|
|
newLoop func(*Target, scraper, int, bool, []*relabel.Config) loop |
|
|
|
|
newLoop func(scrapeLoopOptions) loop |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
type scrapeLoopOptions struct { |
|
|
|
|
target *Target |
|
|
|
|
scraper scraper |
|
|
|
|
limit int |
|
|
|
|
honorLabels bool |
|
|
|
|
mrc []*relabel.Config |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
const maxAheadTime = 10 * time.Minute |
|
|
|
@ -189,24 +197,26 @@ func newScrapePool(cfg *config.ScrapeConfig, app Appendable, logger log.Logger)
|
|
|
|
|
loops: map[uint64]loop{}, |
|
|
|
|
logger: logger, |
|
|
|
|
} |
|
|
|
|
sp.newLoop = func(t *Target, s scraper, limit int, honor bool, mrc []*relabel.Config) loop { |
|
|
|
|
sp.newLoop = func(opts scrapeLoopOptions) loop { |
|
|
|
|
// Update the targets retrieval function for metadata to a new scrape cache.
|
|
|
|
|
cache := newScrapeCache() |
|
|
|
|
t.setMetadataStore(cache) |
|
|
|
|
opts.target.setMetadataStore(cache) |
|
|
|
|
|
|
|
|
|
return newScrapeLoop( |
|
|
|
|
ctx, |
|
|
|
|
s, |
|
|
|
|
log.With(logger, "target", t), |
|
|
|
|
opts.scraper, |
|
|
|
|
log.With(logger, "target", opts.target), |
|
|
|
|
buffers, |
|
|
|
|
func(l labels.Labels) labels.Labels { return mutateSampleLabels(l, t, honor, mrc) }, |
|
|
|
|
func(l labels.Labels) labels.Labels { return mutateReportSampleLabels(l, t) }, |
|
|
|
|
func(l labels.Labels) labels.Labels { |
|
|
|
|
return mutateSampleLabels(l, opts.target, opts.honorLabels, opts.mrc) |
|
|
|
|
}, |
|
|
|
|
func(l labels.Labels) labels.Labels { return mutateReportSampleLabels(l, opts.target) }, |
|
|
|
|
func() storage.Appender { |
|
|
|
|
app, err := app.Appender() |
|
|
|
|
if err != nil { |
|
|
|
|
panic(err) |
|
|
|
|
} |
|
|
|
|
return appender(app, limit) |
|
|
|
|
return appender(app, opts.limit) |
|
|
|
|
}, |
|
|
|
|
cache, |
|
|
|
|
) |
|
|
|
@ -285,7 +295,13 @@ func (sp *scrapePool) reload(cfg *config.ScrapeConfig) error {
|
|
|
|
|
var ( |
|
|
|
|
t = sp.activeTargets[fp] |
|
|
|
|
s = &targetScraper{Target: t, client: sp.client, timeout: timeout} |
|
|
|
|
newLoop = sp.newLoop(t, s, limit, honor, mrc) |
|
|
|
|
newLoop = sp.newLoop(scrapeLoopOptions{ |
|
|
|
|
target: t, |
|
|
|
|
scraper: s, |
|
|
|
|
limit: limit, |
|
|
|
|
honorLabels: honor, |
|
|
|
|
mrc: mrc, |
|
|
|
|
}) |
|
|
|
|
) |
|
|
|
|
wg.Add(1) |
|
|
|
|
|
|
|
|
@ -360,7 +376,13 @@ func (sp *scrapePool) sync(targets []*Target) {
|
|
|
|
|
|
|
|
|
|
if _, ok := sp.activeTargets[hash]; !ok { |
|
|
|
|
s := &targetScraper{Target: t, client: sp.client, timeout: timeout} |
|
|
|
|
l := sp.newLoop(t, s, limit, honor, mrc) |
|
|
|
|
l := sp.newLoop(scrapeLoopOptions{ |
|
|
|
|
target: t, |
|
|
|
|
scraper: s, |
|
|
|
|
limit: limit, |
|
|
|
|
honorLabels: honor, |
|
|
|
|
mrc: mrc, |
|
|
|
|
}) |
|
|
|
|
|
|
|
|
|
sp.activeTargets[hash] = t |
|
|
|
|
sp.loops[hash] = l |
|
|
|
|