|
|
|
@ -130,7 +130,7 @@ type scrapePool struct {
|
|
|
|
|
cancel context.CancelFunc |
|
|
|
|
|
|
|
|
|
// Constructor for new scrape loops. This is settable for testing convenience.
|
|
|
|
|
newLoop func(*Target, scraper) loop |
|
|
|
|
newLoop func(*Target, scraper, int, bool, []*config.RelabelConfig) loop |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
const maxAheadTime = 10 * time.Minute |
|
|
|
@ -160,15 +160,21 @@ func newScrapePool(cfg *config.ScrapeConfig, app Appendable, logger log.Logger)
|
|
|
|
|
loops: map[uint64]loop{}, |
|
|
|
|
logger: logger, |
|
|
|
|
} |
|
|
|
|
sp.newLoop = func(t *Target, s scraper) loop { |
|
|
|
|
sp.newLoop = func(t *Target, s scraper, limit int, honor bool, mrc []*config.RelabelConfig) loop { |
|
|
|
|
return newScrapeLoop( |
|
|
|
|
ctx, |
|
|
|
|
s, |
|
|
|
|
log.With(logger, "target", t), |
|
|
|
|
buffers, |
|
|
|
|
func(l labels.Labels) labels.Labels { return sp.mutateSampleLabels(l, t) }, |
|
|
|
|
func(l labels.Labels) labels.Labels { return sp.mutateReportSampleLabels(l, t) }, |
|
|
|
|
sp.appender, |
|
|
|
|
func(l labels.Labels) labels.Labels { return mutateSampleLabels(l, t, honor, mrc) }, |
|
|
|
|
func(l labels.Labels) labels.Labels { return mutateReportSampleLabels(l, t) }, |
|
|
|
|
func() storage.Appender { |
|
|
|
|
app, err := app.Appender() |
|
|
|
|
if err != nil { |
|
|
|
|
panic(err) |
|
|
|
|
} |
|
|
|
|
return appender(app, limit) |
|
|
|
|
}, |
|
|
|
|
) |
|
|
|
|
} |
|
|
|
|
|
|
|
|
@ -218,13 +224,16 @@ func (sp *scrapePool) reload(cfg *config.ScrapeConfig) {
|
|
|
|
|
wg sync.WaitGroup |
|
|
|
|
interval = time.Duration(sp.config.ScrapeInterval) |
|
|
|
|
timeout = time.Duration(sp.config.ScrapeTimeout) |
|
|
|
|
limit = int(sp.config.SampleLimit) |
|
|
|
|
honor = sp.config.HonorLabels |
|
|
|
|
mrc = sp.config.MetricRelabelConfigs |
|
|
|
|
) |
|
|
|
|
|
|
|
|
|
for fp, oldLoop := range sp.loops { |
|
|
|
|
var ( |
|
|
|
|
t = sp.targets[fp] |
|
|
|
|
s = &targetScraper{Target: t, client: sp.client, timeout: timeout} |
|
|
|
|
newLoop = sp.newLoop(t, s) |
|
|
|
|
newLoop = sp.newLoop(t, s, limit, honor, mrc) |
|
|
|
|
) |
|
|
|
|
wg.Add(1) |
|
|
|
|
|
|
|
|
@ -295,6 +304,9 @@ func (sp *scrapePool) sync(targets []*Target) {
|
|
|
|
|
uniqueTargets = map[uint64]struct{}{} |
|
|
|
|
interval = time.Duration(sp.config.ScrapeInterval) |
|
|
|
|
timeout = time.Duration(sp.config.ScrapeTimeout) |
|
|
|
|
limit = int(sp.config.SampleLimit) |
|
|
|
|
honor = sp.config.HonorLabels |
|
|
|
|
mrc = sp.config.MetricRelabelConfigs |
|
|
|
|
) |
|
|
|
|
|
|
|
|
|
for _, t := range targets { |
|
|
|
@ -304,7 +316,7 @@ func (sp *scrapePool) sync(targets []*Target) {
|
|
|
|
|
|
|
|
|
|
if _, ok := sp.targets[hash]; !ok { |
|
|
|
|
s := &targetScraper{Target: t, client: sp.client, timeout: timeout} |
|
|
|
|
l := sp.newLoop(t, s) |
|
|
|
|
l := sp.newLoop(t, s, limit, honor, mrc) |
|
|
|
|
|
|
|
|
|
sp.targets[hash] = t |
|
|
|
|
sp.loops[hash] = l |
|
|
|
@ -340,10 +352,10 @@ func (sp *scrapePool) sync(targets []*Target) {
|
|
|
|
|
wg.Wait() |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
func (sp *scrapePool) mutateSampleLabels(lset labels.Labels, target *Target) labels.Labels { |
|
|
|
|
func mutateSampleLabels(lset labels.Labels, target *Target, honor bool, rc []*config.RelabelConfig) labels.Labels { |
|
|
|
|
lb := labels.NewBuilder(lset) |
|
|
|
|
|
|
|
|
|
if sp.config.HonorLabels { |
|
|
|
|
if honor { |
|
|
|
|
for _, l := range target.Labels() { |
|
|
|
|
if !lset.Has(l.Name) { |
|
|
|
|
lb.Set(l.Name, l.Value) |
|
|
|
@ -367,14 +379,14 @@ func (sp *scrapePool) mutateSampleLabels(lset labels.Labels, target *Target) lab
|
|
|
|
|
|
|
|
|
|
res := lb.Labels() |
|
|
|
|
|
|
|
|
|
if mrc := sp.config.MetricRelabelConfigs; len(mrc) > 0 { |
|
|
|
|
res = relabel.Process(res, mrc...) |
|
|
|
|
if len(rc) > 0 { |
|
|
|
|
res = relabel.Process(res, rc...) |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
return res |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
func (sp *scrapePool) mutateReportSampleLabels(lset labels.Labels, target *Target) labels.Labels { |
|
|
|
|
func mutateReportSampleLabels(lset labels.Labels, target *Target) labels.Labels { |
|
|
|
|
lb := labels.NewBuilder(lset) |
|
|
|
|
|
|
|
|
|
for _, l := range target.Labels() { |
|
|
|
@ -389,22 +401,17 @@ func (sp *scrapePool) mutateReportSampleLabels(lset labels.Labels, target *Targe
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
// appender returns an appender for ingested samples from the target.
|
|
|
|
|
func (sp *scrapePool) appender() storage.Appender { |
|
|
|
|
app, err := sp.appendable.Appender() |
|
|
|
|
if err != nil { |
|
|
|
|
panic(err) |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
func appender(app storage.Appender, limit int) storage.Appender { |
|
|
|
|
app = &timeLimitAppender{ |
|
|
|
|
Appender: app, |
|
|
|
|
maxTime: timestamp.FromTime(time.Now().Add(maxAheadTime)), |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
// The limit is applied after metrics are potentially dropped via relabeling.
|
|
|
|
|
if sp.config.SampleLimit > 0 { |
|
|
|
|
if limit > 0 { |
|
|
|
|
app = &limitAppender{ |
|
|
|
|
Appender: app, |
|
|
|
|
limit: int(sp.config.SampleLimit), |
|
|
|
|
limit: limit, |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
return app |
|
|
|
|