diff --git a/retrieval/scrape.go b/retrieval/scrape.go index 9114a4ad2..ad608a105 100644 --- a/retrieval/scrape.go +++ b/retrieval/scrape.go @@ -102,7 +102,7 @@ type scrapePool struct { loops map[uint64]loop // Constructor for new scrape loops. This is settable for testing convenience. - newLoop func(context.Context, scraper, storage.SampleAppender, storage.SampleAppender) loop + newLoop func(context.Context, scraper, storage.SampleAppender, func(storage.SampleAppender) storage.SampleAppender, storage.SampleAppender) loop } func newScrapePool(ctx context.Context, cfg *config.ScrapeConfig, app storage.SampleAppender) *scrapePool { @@ -171,7 +171,7 @@ func (sp *scrapePool) reload(cfg *config.ScrapeConfig) { var ( t = sp.targets[fp] s = &targetScraper{Target: t, client: sp.client} - newLoop = sp.newLoop(sp.ctx, s, sp.sampleAppender(t), sp.reportAppender(t)) + newLoop = sp.newLoop(sp.ctx, s, sp.appender, sp.sampleMutator(t), sp.reportAppender(t)) ) wg.Add(1) @@ -232,7 +232,7 @@ func (sp *scrapePool) sync(targets []*Target) { if _, ok := sp.targets[hash]; !ok { s := &targetScraper{Target: t, client: sp.client} - l := sp.newLoop(sp.ctx, s, sp.sampleAppender(t), sp.reportAppender(t)) + l := sp.newLoop(sp.ctx, s, sp.appender, sp.sampleMutator(t), sp.reportAppender(t)) sp.targets[hash] = t sp.loops[hash] = l @@ -264,30 +264,31 @@ func (sp *scrapePool) sync(targets []*Target) { wg.Wait() } -// sampleAppender returns an appender for ingested samples from the target. -func (sp *scrapePool) sampleAppender(target *Target) storage.SampleAppender { - app := sp.appender - // The relabelAppender has to be inside the label-modifying appenders - // so the relabeling rules are applied to the correct label set. - if mrc := sp.config.MetricRelabelConfigs; len(mrc) > 0 { - app = relabelAppender{ - SampleAppender: app, - relabelings: mrc, +// sampleMutator returns a function that'll take an appender and return an appender for mutated samples. +func (sp *scrapePool) sampleMutator(target *Target) func(storage.SampleAppender) storage.SampleAppender { + return func(app storage.SampleAppender) storage.SampleAppender { + // The relabelAppender has to be inside the label-modifying appenders + // so the relabeling rules are applied to the correct label set. + if mrc := sp.config.MetricRelabelConfigs; len(mrc) > 0 { + app = relabelAppender{ + SampleAppender: app, + relabelings: mrc, + } } - } - if sp.config.HonorLabels { - app = honorLabelsAppender{ - SampleAppender: app, - labels: target.Labels(), - } - } else { - app = ruleLabelsAppender{ - SampleAppender: app, - labels: target.Labels(), + if sp.config.HonorLabels { + app = honorLabelsAppender{ + SampleAppender: app, + labels: target.Labels(), + } + } else { + app = ruleLabelsAppender{ + SampleAppender: app, + labels: target.Labels(), + } } + return app } - return app } // reportAppender returns an appender for reporting samples for the target. @@ -365,7 +366,11 @@ type loop interface { type scrapeLoop struct { scraper scraper - appender storage.SampleAppender + // Where samples are ultimately sent. + appender storage.SampleAppender + // Applies relabel rules and label handling. + mutator func(storage.SampleAppender) storage.SampleAppender + // For sending up and scrape_*. reportAppender storage.SampleAppender done chan struct{} @@ -373,10 +378,11 @@ type scrapeLoop struct { cancel func() } -func newScrapeLoop(ctx context.Context, sc scraper, app, reportApp storage.SampleAppender) loop { +func newScrapeLoop(ctx context.Context, sc scraper, app storage.SampleAppender, mut func(storage.SampleAppender) storage.SampleAppender, reportApp storage.SampleAppender) loop { sl := &scrapeLoop{ scraper: sc, appender: app, + mutator: mut, reportAppender: reportApp, done: make(chan struct{}), } @@ -422,7 +428,15 @@ func (sl *scrapeLoop) run(interval, timeout time.Duration, errc chan<- error) { samples, err := sl.scraper.scrape(scrapeCtx, start) if err == nil { - sl.append(samples) + // Collect samples post-relabelling and label handling in a buffer. + buf := &bufferAppender{buffer: make(model.Samples, 0, len(samples))} + app := sl.mutator(buf) + for _, sample := range samples { + app.Append(sample) + } + + // Send samples to storage. + sl.append(buf.buffer) } else if errc != nil { errc <- err } diff --git a/retrieval/scrape_test.go b/retrieval/scrape_test.go index aaa19132f..db1a1bbc9 100644 --- a/retrieval/scrape_test.go +++ b/retrieval/scrape_test.go @@ -139,7 +139,7 @@ func TestScrapePoolReload(t *testing.T) { } // On starting to run, new loops created on reload check whether their preceding // equivalents have been stopped. - newLoop := func(ctx context.Context, s scraper, app, reportApp storage.SampleAppender) loop { + newLoop := func(ctx context.Context, s scraper, app storage.SampleAppender, mut func(storage.SampleAppender) storage.SampleAppender, reportApp storage.SampleAppender) loop { l := &testLoop{} l.startFunc = func(interval, timeout time.Duration, errc chan<- error) { if interval != 3*time.Second { @@ -269,7 +269,7 @@ func TestScrapePoolSampleAppender(t *testing.T) { sp := newScrapePool(context.Background(), cfg, app) cfg.HonorLabels = false - wrapped := sp.sampleAppender(target) + wrapped := sp.sampleMutator(target)(app) rl, ok := wrapped.(ruleLabelsAppender) if !ok { @@ -284,7 +284,7 @@ func TestScrapePoolSampleAppender(t *testing.T) { } cfg.HonorLabels = true - wrapped = sp.sampleAppender(target) + wrapped = sp.sampleMutator(target)(app) hl, ok := wrapped.(honorLabelsAppender) if !ok { @@ -301,7 +301,7 @@ func TestScrapePoolSampleAppender(t *testing.T) { func TestScrapeLoopStop(t *testing.T) { scraper := &testScraper{} - sl := newScrapeLoop(context.Background(), scraper, nil, nil) + sl := newScrapeLoop(context.Background(), scraper, nil, nil, nil) // The scrape pool synchronizes on stopping scrape loops. However, new scrape // loops are syarted asynchronously. Thus it's possible, that a loop is stopped @@ -353,12 +353,13 @@ func TestScrapeLoopRun(t *testing.T) { scraper = &testScraper{} app = &nopAppender{} + mut = func(storage.SampleAppender) storage.SampleAppender { return &nopAppender{} } reportApp = &nopAppender{} ) defer close(signal) ctx, cancel := context.WithCancel(context.Background()) - sl := newScrapeLoop(ctx, scraper, app, reportApp) + sl := newScrapeLoop(ctx, scraper, app, mut, reportApp) // The loop must terminate during the initial offset if the context // is canceled. @@ -396,7 +397,7 @@ func TestScrapeLoopRun(t *testing.T) { } ctx, cancel = context.WithCancel(context.Background()) - sl = newScrapeLoop(ctx, scraper, app, reportApp) + sl = newScrapeLoop(ctx, scraper, app, mut, reportApp) go func() { sl.run(time.Second, 100*time.Millisecond, errc) diff --git a/retrieval/target.go b/retrieval/target.go index fabf93c19..094d856c1 100644 --- a/retrieval/target.go +++ b/retrieval/target.go @@ -278,6 +278,17 @@ func (app relabelAppender) Append(s *model.Sample) error { return app.SampleAppender.Append(s) } +// Appends samples to the given buffer. +type bufferAppender struct { + storage.SampleAppender + buffer model.Samples +} + +func (app bufferAppender) Append(s *model.Sample) error { + app.buffer = append(app.buffer, s) + return nil +} + // populateLabels builds a label set from the given label set and scrape configuration. // It returns a label set before relabeling was applied as the second return value. // Returns a nil label set if the target is dropped during relabeling.