Allow buffering of scraped samples before sending them to storage.

pull/2288/head
Brian Brazil 8 years ago
parent e714079cf2
commit b5ded43594

@ -102,7 +102,7 @@ type scrapePool struct {
loops map[uint64]loop loops map[uint64]loop
// Constructor for new scrape loops. This is settable for testing convenience. // Constructor for new scrape loops. This is settable for testing convenience.
newLoop func(context.Context, scraper, storage.SampleAppender, storage.SampleAppender) loop newLoop func(context.Context, scraper, storage.SampleAppender, func(storage.SampleAppender) storage.SampleAppender, storage.SampleAppender) loop
} }
func newScrapePool(ctx context.Context, cfg *config.ScrapeConfig, app storage.SampleAppender) *scrapePool { func newScrapePool(ctx context.Context, cfg *config.ScrapeConfig, app storage.SampleAppender) *scrapePool {
@ -171,7 +171,7 @@ func (sp *scrapePool) reload(cfg *config.ScrapeConfig) {
var ( var (
t = sp.targets[fp] t = sp.targets[fp]
s = &targetScraper{Target: t, client: sp.client} s = &targetScraper{Target: t, client: sp.client}
newLoop = sp.newLoop(sp.ctx, s, sp.sampleAppender(t), sp.reportAppender(t)) newLoop = sp.newLoop(sp.ctx, s, sp.appender, sp.sampleMutator(t), sp.reportAppender(t))
) )
wg.Add(1) wg.Add(1)
@ -232,7 +232,7 @@ func (sp *scrapePool) sync(targets []*Target) {
if _, ok := sp.targets[hash]; !ok { if _, ok := sp.targets[hash]; !ok {
s := &targetScraper{Target: t, client: sp.client} s := &targetScraper{Target: t, client: sp.client}
l := sp.newLoop(sp.ctx, s, sp.sampleAppender(t), sp.reportAppender(t)) l := sp.newLoop(sp.ctx, s, sp.appender, sp.sampleMutator(t), sp.reportAppender(t))
sp.targets[hash] = t sp.targets[hash] = t
sp.loops[hash] = l sp.loops[hash] = l
@ -264,30 +264,31 @@ func (sp *scrapePool) sync(targets []*Target) {
wg.Wait() wg.Wait()
} }
// sampleAppender returns an appender for ingested samples from the target. // sampleMutator returns a function that'll take an appender and return an appender for mutated samples.
func (sp *scrapePool) sampleAppender(target *Target) storage.SampleAppender { func (sp *scrapePool) sampleMutator(target *Target) func(storage.SampleAppender) storage.SampleAppender {
app := sp.appender return func(app storage.SampleAppender) storage.SampleAppender {
// The relabelAppender has to be inside the label-modifying appenders // The relabelAppender has to be inside the label-modifying appenders
// so the relabeling rules are applied to the correct label set. // so the relabeling rules are applied to the correct label set.
if mrc := sp.config.MetricRelabelConfigs; len(mrc) > 0 { if mrc := sp.config.MetricRelabelConfigs; len(mrc) > 0 {
app = relabelAppender{ app = relabelAppender{
SampleAppender: app, SampleAppender: app,
relabelings: mrc, relabelings: mrc,
}
} }
}
if sp.config.HonorLabels { if sp.config.HonorLabels {
app = honorLabelsAppender{ app = honorLabelsAppender{
SampleAppender: app, SampleAppender: app,
labels: target.Labels(), labels: target.Labels(),
} }
} else { } else {
app = ruleLabelsAppender{ app = ruleLabelsAppender{
SampleAppender: app, SampleAppender: app,
labels: target.Labels(), labels: target.Labels(),
}
} }
return app
} }
return app
} }
// reportAppender returns an appender for reporting samples for the target. // reportAppender returns an appender for reporting samples for the target.
@ -365,7 +366,11 @@ type loop interface {
type scrapeLoop struct { type scrapeLoop struct {
scraper scraper scraper scraper
appender storage.SampleAppender // Where samples are ultimately sent.
appender storage.SampleAppender
// Applies relabel rules and label handling.
mutator func(storage.SampleAppender) storage.SampleAppender
// For sending up and scrape_*.
reportAppender storage.SampleAppender reportAppender storage.SampleAppender
done chan struct{} done chan struct{}
@ -373,10 +378,11 @@ type scrapeLoop struct {
cancel func() cancel func()
} }
func newScrapeLoop(ctx context.Context, sc scraper, app, reportApp storage.SampleAppender) loop { func newScrapeLoop(ctx context.Context, sc scraper, app storage.SampleAppender, mut func(storage.SampleAppender) storage.SampleAppender, reportApp storage.SampleAppender) loop {
sl := &scrapeLoop{ sl := &scrapeLoop{
scraper: sc, scraper: sc,
appender: app, appender: app,
mutator: mut,
reportAppender: reportApp, reportAppender: reportApp,
done: make(chan struct{}), done: make(chan struct{}),
} }
@ -422,7 +428,15 @@ func (sl *scrapeLoop) run(interval, timeout time.Duration, errc chan<- error) {
samples, err := sl.scraper.scrape(scrapeCtx, start) samples, err := sl.scraper.scrape(scrapeCtx, start)
if err == nil { if err == nil {
sl.append(samples) // Collect samples post-relabelling and label handling in a buffer.
buf := &bufferAppender{buffer: make(model.Samples, 0, len(samples))}
app := sl.mutator(buf)
for _, sample := range samples {
app.Append(sample)
}
// Send samples to storage.
sl.append(buf.buffer)
} else if errc != nil { } else if errc != nil {
errc <- err errc <- err
} }

@ -139,7 +139,7 @@ func TestScrapePoolReload(t *testing.T) {
} }
// On starting to run, new loops created on reload check whether their preceding // On starting to run, new loops created on reload check whether their preceding
// equivalents have been stopped. // equivalents have been stopped.
newLoop := func(ctx context.Context, s scraper, app, reportApp storage.SampleAppender) loop { newLoop := func(ctx context.Context, s scraper, app storage.SampleAppender, mut func(storage.SampleAppender) storage.SampleAppender, reportApp storage.SampleAppender) loop {
l := &testLoop{} l := &testLoop{}
l.startFunc = func(interval, timeout time.Duration, errc chan<- error) { l.startFunc = func(interval, timeout time.Duration, errc chan<- error) {
if interval != 3*time.Second { if interval != 3*time.Second {
@ -269,7 +269,7 @@ func TestScrapePoolSampleAppender(t *testing.T) {
sp := newScrapePool(context.Background(), cfg, app) sp := newScrapePool(context.Background(), cfg, app)
cfg.HonorLabels = false cfg.HonorLabels = false
wrapped := sp.sampleAppender(target) wrapped := sp.sampleMutator(target)(app)
rl, ok := wrapped.(ruleLabelsAppender) rl, ok := wrapped.(ruleLabelsAppender)
if !ok { if !ok {
@ -284,7 +284,7 @@ func TestScrapePoolSampleAppender(t *testing.T) {
} }
cfg.HonorLabels = true cfg.HonorLabels = true
wrapped = sp.sampleAppender(target) wrapped = sp.sampleMutator(target)(app)
hl, ok := wrapped.(honorLabelsAppender) hl, ok := wrapped.(honorLabelsAppender)
if !ok { if !ok {
@ -301,7 +301,7 @@ func TestScrapePoolSampleAppender(t *testing.T) {
func TestScrapeLoopStop(t *testing.T) { func TestScrapeLoopStop(t *testing.T) {
scraper := &testScraper{} scraper := &testScraper{}
sl := newScrapeLoop(context.Background(), scraper, nil, nil) sl := newScrapeLoop(context.Background(), scraper, nil, nil, nil)
// The scrape pool synchronizes on stopping scrape loops. However, new scrape // The scrape pool synchronizes on stopping scrape loops. However, new scrape
// loops are syarted asynchronously. Thus it's possible, that a loop is stopped // loops are syarted asynchronously. Thus it's possible, that a loop is stopped
@ -353,12 +353,13 @@ func TestScrapeLoopRun(t *testing.T) {
scraper = &testScraper{} scraper = &testScraper{}
app = &nopAppender{} app = &nopAppender{}
mut = func(storage.SampleAppender) storage.SampleAppender { return &nopAppender{} }
reportApp = &nopAppender{} reportApp = &nopAppender{}
) )
defer close(signal) defer close(signal)
ctx, cancel := context.WithCancel(context.Background()) ctx, cancel := context.WithCancel(context.Background())
sl := newScrapeLoop(ctx, scraper, app, reportApp) sl := newScrapeLoop(ctx, scraper, app, mut, reportApp)
// The loop must terminate during the initial offset if the context // The loop must terminate during the initial offset if the context
// is canceled. // is canceled.
@ -396,7 +397,7 @@ func TestScrapeLoopRun(t *testing.T) {
} }
ctx, cancel = context.WithCancel(context.Background()) ctx, cancel = context.WithCancel(context.Background())
sl = newScrapeLoop(ctx, scraper, app, reportApp) sl = newScrapeLoop(ctx, scraper, app, mut, reportApp)
go func() { go func() {
sl.run(time.Second, 100*time.Millisecond, errc) sl.run(time.Second, 100*time.Millisecond, errc)

@ -278,6 +278,17 @@ func (app relabelAppender) Append(s *model.Sample) error {
return app.SampleAppender.Append(s) return app.SampleAppender.Append(s)
} }
// Appends samples to the given buffer.
type bufferAppender struct {
storage.SampleAppender
buffer model.Samples
}
func (app bufferAppender) Append(s *model.Sample) error {
app.buffer = append(app.buffer, s)
return nil
}
// populateLabels builds a label set from the given label set and scrape configuration. // populateLabels builds a label set from the given label set and scrape configuration.
// It returns a label set before relabeling was applied as the second return value. // It returns a label set before relabeling was applied as the second return value.
// Returns a nil label set if the target is dropped during relabeling. // Returns a nil label set if the target is dropped during relabeling.

Loading…
Cancel
Save