Fix scrape timestamps to reduce sample time jitter.

We're currently timestamping samples with the time at the end of a scrape
iteration. It makes more sense to use a timestamp from the beginning of the
scrape for two reasons:

a) this time is more relevant to the scraped values than the time at the
   end of the HTTP-GET + JSON decoding work.
b) it reduces sample timestamp jitter if we measure at the beginning, and
   not at the completion of a scrape.
pull/139/head
Julius Volz 12 years ago
parent f817106d6a
commit a1ba23038e

@ -16,6 +16,7 @@ package format
import ( import (
"github.com/prometheus/prometheus/model" "github.com/prometheus/prometheus/model"
"io" "io"
"time"
) )
// Processor is responsible for decoding the actual message responses from // Processor is responsible for decoding the actual message responses from
@ -23,5 +24,5 @@ import (
// to the results channel. // to the results channel.
type Processor interface { type Processor interface {
// Process performs the work on the input and closes the incoming stream. // Process performs the work on the input and closes the incoming stream.
Process(stream io.ReadCloser, baseLabels model.LabelSet, results chan Result) (err error) Process(stream io.ReadCloser, timestamp time.Time, baseLabels model.LabelSet, results chan Result) (err error)
} }

@ -20,6 +20,7 @@ import (
"github.com/prometheus/prometheus/utility" "github.com/prometheus/prometheus/utility"
"io" "io"
"io/ioutil" "io/ioutil"
"time"
) )
const ( const (
@ -57,7 +58,7 @@ type entity001 []struct {
} `json:"metric"` } `json:"metric"`
} }
func (p *processor001) Process(stream io.ReadCloser, baseLabels model.LabelSet, results chan Result) (err error) { func (p *processor001) Process(stream io.ReadCloser, timestamp time.Time, baseLabels model.LabelSet, results chan Result) (err error) {
// TODO(matt): Replace with plain-jane JSON unmarshalling. // TODO(matt): Replace with plain-jane JSON unmarshalling.
defer stream.Close() defer stream.Close()
@ -73,8 +74,6 @@ func (p *processor001) Process(stream io.ReadCloser, baseLabels model.LabelSet,
return return
} }
now := p.time.Now()
// TODO(matt): This outer loop is a great basis for parallelization. // TODO(matt): This outer loop is a great basis for parallelization.
for _, entity := range entities { for _, entity := range entities {
for _, value := range entity.Metric.Value { for _, value := range entity.Metric.Value {
@ -101,7 +100,7 @@ func (p *processor001) Process(stream io.ReadCloser, baseLabels model.LabelSet,
sample := model.Sample{ sample := model.Sample{
Metric: metric, Metric: metric,
Timestamp: now, Timestamp: timestamp,
Value: model.SampleValue(sampleValue), Value: model.SampleValue(sampleValue),
} }
@ -136,7 +135,7 @@ func (p *processor001) Process(stream io.ReadCloser, baseLabels model.LabelSet,
sample := model.Sample{ sample := model.Sample{
Metric: childMetric, Metric: childMetric,
Timestamp: now, Timestamp: timestamp,
Value: model.SampleValue(individualValue), Value: model.SampleValue(individualValue),
} }

@ -21,6 +21,7 @@ import (
"io/ioutil" "io/ioutil"
"strings" "strings"
"testing" "testing"
"time"
) )
func testProcessor001Process(t test.Tester) { func testProcessor001Process(t test.Tester) {
@ -172,7 +173,7 @@ func testProcessor001Process(t test.Tester) {
reader := strings.NewReader(scenario.in) reader := strings.NewReader(scenario.in)
err := Processor001.Process(ioutil.NopCloser(reader), model.LabelSet{}, inputChannel) err := Processor001.Process(ioutil.NopCloser(reader), time.Now(), model.LabelSet{}, inputChannel)
if !test.ErrorEqual(scenario.err, err) { if !test.ErrorEqual(scenario.err, err) {
t.Errorf("%d. expected err of %s, got %s", i, scenario.err, err) t.Errorf("%d. expected err of %s, got %s", i, scenario.err, err)
continue continue

@ -162,6 +162,8 @@ func (t *target) Scrape(earliest time.Time, results chan format.Result) (err err
done <- true done <- true
}() }()
now := time.Now()
var resp *http.Response // Don't shadow "err" from the enclosing function. var resp *http.Response // Don't shadow "err" from the enclosing function.
resp, err = http.Get(t.Address()) resp, err = http.Get(t.Address())
if err != nil { if err != nil {
@ -182,7 +184,7 @@ func (t *target) Scrape(earliest time.Time, results chan format.Result) (err err
baseLabels[baseLabel] = baseValue baseLabels[baseLabel] = baseValue
} }
err = processor.Process(resp.Body, baseLabels, results) err = processor.Process(resp.Body, now, baseLabels, results)
if err != nil { if err != nil {
return return
} }

Loading…
Cancel
Save