Browse Source

Make retrieval work with client's new Ingester interface.

pull/356/head
Julius Volz 11 years ago
parent
commit
f8b20f30ac
  1. 31
      retrieval/target.go

31
retrieval/target.go

@ -202,6 +202,19 @@ func (t *target) Scrape(earliest time.Time, results chan<- *extraction.Result) e
const acceptHeader = `application/vnd.google.protobuf;proto=io.prometheus.client.MetricFamily;encoding=delimited;q=0.7,application/json;schema=prometheus/telemetry;version=0.0.2;q=0.2,*/*;q=0.1`
type extendLabelsIngester struct {
baseLabels clientmodel.LabelSet
results chan<- *extraction.Result
}
func (i *extendLabelsIngester) Ingest(r *extraction.Result) error {
for _, s := range r.Samples {
s.Metric.MergeFromLabelSet(i.baseLabels, clientmodel.ExporterLabelPrefix)
}
i.results <- r
return nil
}
func (t *target) scrape(timestamp time.Time, results chan<- *extraction.Result) (err error) {
defer func(start time.Time) {
ms := float64(time.Since(start)) / float64(time.Millisecond)
@ -238,20 +251,22 @@ func (t *target) scrape(timestamp time.Time, results chan<- *extraction.Result)
baseLabels[baseLabel] = baseValue
}
processOptions := &extraction.ProcessOptions{
Timestamp: timestamp,
BaseLabels: baseLabels,
}
// N.B. - It is explicitly required to extract the entire payload before
// attempting to deserialize, as the underlying reader expects will
// interpret pending data as a truncated message.
// attempting to deserialize, as the underlying reader will interpret
// pending data as a truncated message.
buf := new(bytes.Buffer)
if _, err := buf.ReadFrom(resp.Body); err != nil {
return err
}
return processor.ProcessSingle(buf, results, processOptions)
ingester := &extendLabelsIngester{
baseLabels: baseLabels,
results: results,
}
processOptions := &extraction.ProcessOptions{
Timestamp: timestamp,
}
return processor.ProcessSingle(buf, ingester, processOptions)
}
func (t *target) State() TargetState {

Loading…
Cancel
Save