From 11a577fcd054066f1e4ab3da6f2d8b38132d97b6 Mon Sep 17 00:00:00 2001 From: Fabian Reinartz Date: Thu, 20 Aug 2015 19:02:29 +0200 Subject: [PATCH] Switch to common/expfmt for extraction --- retrieval/target.go | 69 ++++++++++++++++++--------------------------- 1 file changed, 27 insertions(+), 42 deletions(-) diff --git a/retrieval/target.go b/retrieval/target.go index d780115b2..2bab85444 100644 --- a/retrieval/target.go +++ b/retrieval/target.go @@ -18,6 +18,7 @@ import ( "crypto/x509" "errors" "fmt" + "io" "io/ioutil" "math/rand" "net/http" @@ -26,11 +27,10 @@ import ( "sync" "time" - "github.com/prometheus/client_golang/extraction" "github.com/prometheus/client_golang/prometheus" - "github.com/prometheus/log" - + "github.com/prometheus/common/expfmt" "github.com/prometheus/common/model" + "github.com/prometheus/log" "github.com/prometheus/prometheus/config" "github.com/prometheus/prometheus/storage" @@ -301,30 +301,6 @@ func (t *Target) String() string { return t.url.Host } -// Ingest implements an extraction.Ingester. -func (t *Target) Ingest(s model.Samples) error { - t.RLock() - deadline := t.deadline - t.RUnlock() - // Since the regular case is that ingestedSamples is ready to receive, - // first try without setting a timeout so that we don't need to allocate - // a timer most of the time. - select { - case t.ingestedSamples <- s: - return nil - default: - select { - case t.ingestedSamples <- s: - return nil - case <-time.After(deadline / 10): - return errIngestChannelFull - } - } -} - -// Ensure that Target implements extraction.Ingester at compile time. -var _ extraction.Ingester = (*Target)(nil) - // RunScraper implements Target. func (t *Target) RunScraper(sampleAppender storage.SampleAppender) { defer close(t.scraperStopped) @@ -416,7 +392,7 @@ func (t *Target) scrape(sampleAppender storage.SampleAppender) (err error) { defer func() { t.status.setLastError(err) - recordScrapeHealth(sampleAppender, model.TimeFromTime(start), baseLabels, t.status.Health(), time.Since(start)) + recordScrapeHealth(sampleAppender, start, baseLabels, t.status.Health(), time.Since(start)) }() req, err := http.NewRequest("GET", t.URL().String(), nil) @@ -429,27 +405,30 @@ func (t *Target) scrape(sampleAppender storage.SampleAppender) (err error) { if err != nil { return err } - defer resp.Body.Close() if resp.StatusCode != http.StatusOK { return fmt.Errorf("server returned HTTP status %s", resp.Status) } - processor, err := extraction.ProcessorForRequestHeader(resp.Header) + dec, err := expfmt.NewDecoder(resp.Body, resp.Header) if err != nil { return err } + defer resp.Body.Close() - t.ingestedSamples = make(chan model.Samples, ingestedSamplesCap) - - processOptions := &extraction.ProcessOptions{ - Timestamp: model.TimeFromTime(start), + sdec := expfmt.SampleDecoder{ + Dec: dec, + Opts: &expfmt.DecodeOptions{ + Timestamp: model.TimeFromUnixNano(start.UnixNano()), + }, } - go func() { - err = processor.ProcessSingle(resp.Body, t, processOptions) - close(t.ingestedSamples) - }() - for samples := range t.ingestedSamples { + var samples model.Vector + + for { + if err = sdec.Decode(&samples); err != nil { + break + } + for _, s := range samples { if honorLabels { // Merge the metric with the baseLabels for labels not already set in the @@ -485,6 +464,10 @@ func (t *Target) scrape(sampleAppender storage.SampleAppender) (err error) { sampleAppender.Append(s) } } + + if err == io.EOF { + return nil + } return err } @@ -540,7 +523,7 @@ func (t *Target) MetaLabels() model.LabelSet { func recordScrapeHealth( sampleAppender storage.SampleAppender, - timestamp model.Time, + timestamp time.Time, baseLabels model.LabelSet, health TargetHealth, scrapeDuration time.Duration, @@ -561,14 +544,16 @@ func recordScrapeHealth( healthValue = model.SampleValue(1) } + ts := model.TimeFromUnixNano(timestamp.UnixNano()) + healthSample := &model.Sample{ Metric: healthMetric, - Timestamp: timestamp, + Timestamp: ts, Value: healthValue, } durationSample := &model.Sample{ Metric: durationMetric, - Timestamp: timestamp, + Timestamp: ts, Value: model.SampleValue(float64(scrapeDuration) / float64(time.Second)), }