mirror of https://github.com/prometheus/prometheus
Merge pull request #1394 from prometheus/scraperef2
Refactor and test appender modificationspull/1397/head
commit
a0078ec84c
|
@ -332,6 +332,40 @@ func (t *Target) path() string {
|
|||
return string(t.labels[model.MetricsPathLabel])
|
||||
}
|
||||
|
||||
// wrapAppender wraps a SampleAppender for samples ingested from the target.
|
||||
func (t *Target) wrapAppender(app storage.SampleAppender) storage.SampleAppender {
|
||||
// The relabelAppender has to be inside the label-modifying appenders
|
||||
// so the relabeling rules are applied to the correct label set.
|
||||
if mrc := t.scrapeConfig.MetricRelabelConfigs; len(mrc) > 0 {
|
||||
app = relabelAppender{
|
||||
SampleAppender: app,
|
||||
relabelings: mrc,
|
||||
}
|
||||
}
|
||||
|
||||
if t.scrapeConfig.HonorLabels {
|
||||
app = honorLabelsAppender{
|
||||
SampleAppender: app,
|
||||
labels: t.Labels(),
|
||||
}
|
||||
} else {
|
||||
app = ruleLabelsAppender{
|
||||
SampleAppender: app,
|
||||
labels: t.Labels(),
|
||||
}
|
||||
}
|
||||
return app
|
||||
}
|
||||
|
||||
// wrapReportingAppender wraps an appender for target status report samples.
|
||||
// It ignores any relabeling rules set for the target.
|
||||
func (t *Target) wrapReportingAppender(app storage.SampleAppender) storage.SampleAppender {
|
||||
return ruleLabelsAppender{
|
||||
SampleAppender: app,
|
||||
labels: t.Labels(),
|
||||
}
|
||||
}
|
||||
|
||||
// URL returns a copy of the target's URL.
|
||||
func (t *Target) URL() *url.URL {
|
||||
t.RLock()
|
||||
|
@ -459,36 +493,16 @@ const acceptHeader = `application/vnd.google.protobuf;proto=io.prometheus.client
|
|||
|
||||
func (t *Target) scrape(appender storage.SampleAppender) error {
|
||||
var (
|
||||
err error
|
||||
start = time.Now()
|
||||
labels = t.Labels()
|
||||
err error
|
||||
start = time.Now()
|
||||
)
|
||||
defer func(appender storage.SampleAppender) {
|
||||
t.status.setLastError(err)
|
||||
recordScrapeHealth(appender, start, labels, t.status.Health(), time.Since(start))
|
||||
t.report(appender, start, time.Since(start), err)
|
||||
}(appender)
|
||||
|
||||
t.RLock()
|
||||
// The relabelAppender has to be inside the label-modifying appenders
|
||||
// so the relabeling rules are applied to the correct label set.
|
||||
if len(t.scrapeConfig.MetricRelabelConfigs) > 0 {
|
||||
appender = relabelAppender{
|
||||
SampleAppender: appender,
|
||||
relabelings: t.scrapeConfig.MetricRelabelConfigs,
|
||||
}
|
||||
}
|
||||
|
||||
if t.scrapeConfig.HonorLabels {
|
||||
appender = honorLabelsAppender{
|
||||
SampleAppender: appender,
|
||||
labels: labels,
|
||||
}
|
||||
} else {
|
||||
appender = ruleLabelsAppender{
|
||||
SampleAppender: appender,
|
||||
labels: labels,
|
||||
}
|
||||
}
|
||||
appender = t.wrapAppender(appender)
|
||||
|
||||
httpClient := t.httpClient
|
||||
t.RUnlock()
|
||||
|
@ -550,6 +564,38 @@ func (t *Target) scrape(appender storage.SampleAppender) error {
|
|||
return err
|
||||
}
|
||||
|
||||
func (t *Target) report(app storage.SampleAppender, start time.Time, duration time.Duration, err error) {
|
||||
t.status.setLastScrape(start)
|
||||
t.status.setLastError(err)
|
||||
|
||||
ts := model.TimeFromUnixNano(start.UnixNano())
|
||||
|
||||
var health model.SampleValue
|
||||
if err == nil {
|
||||
health = 1
|
||||
}
|
||||
|
||||
healthSample := &model.Sample{
|
||||
Metric: model.Metric{
|
||||
model.MetricNameLabel: scrapeHealthMetricName,
|
||||
},
|
||||
Timestamp: ts,
|
||||
Value: health,
|
||||
}
|
||||
durationSample := &model.Sample{
|
||||
Metric: model.Metric{
|
||||
model.MetricNameLabel: scrapeDurationMetricName,
|
||||
},
|
||||
Timestamp: ts,
|
||||
Value: model.SampleValue(float64(duration) / float64(time.Second)),
|
||||
}
|
||||
|
||||
app = t.wrapReportingAppender(app)
|
||||
|
||||
app.Append(healthSample)
|
||||
app.Append(durationSample)
|
||||
}
|
||||
|
||||
// Merges the ingested sample's metric with the label set. On a collision the
|
||||
// value of the ingested label is stored in a label prefixed with 'exported_'.
|
||||
type ruleLabelsAppender struct {
|
||||
|
@ -633,38 +679,3 @@ func (t *Target) MetaLabels() model.LabelSet {
|
|||
|
||||
return t.metaLabels.Clone()
|
||||
}
|
||||
|
||||
func recordScrapeHealth(
|
||||
sampleAppender storage.SampleAppender,
|
||||
timestamp time.Time,
|
||||
labels model.LabelSet,
|
||||
health TargetHealth,
|
||||
scrapeDuration time.Duration,
|
||||
) {
|
||||
healthMetric := make(model.Metric, len(labels)+1)
|
||||
durationMetric := make(model.Metric, len(labels)+1)
|
||||
|
||||
healthMetric[model.MetricNameLabel] = scrapeHealthMetricName
|
||||
durationMetric[model.MetricNameLabel] = scrapeDurationMetricName
|
||||
|
||||
for ln, lv := range labels {
|
||||
healthMetric[ln] = lv
|
||||
durationMetric[ln] = lv
|
||||
}
|
||||
|
||||
ts := model.TimeFromUnixNano(timestamp.UnixNano())
|
||||
|
||||
healthSample := &model.Sample{
|
||||
Metric: healthMetric,
|
||||
Timestamp: ts,
|
||||
Value: health.value(),
|
||||
}
|
||||
durationSample := &model.Sample{
|
||||
Metric: durationMetric,
|
||||
Timestamp: ts,
|
||||
Value: model.SampleValue(float64(scrapeDuration) / float64(time.Second)),
|
||||
}
|
||||
|
||||
sampleAppender.Append(healthSample)
|
||||
sampleAppender.Append(durationSample)
|
||||
}
|
||||
|
|
|
@ -91,6 +91,82 @@ func TestTargetOffset(t *testing.T) {
|
|||
}
|
||||
}
|
||||
|
||||
func TestTargetWrapReportingAppender(t *testing.T) {
|
||||
cfg := &config.ScrapeConfig{
|
||||
MetricRelabelConfigs: []*config.RelabelConfig{
|
||||
{}, {}, {},
|
||||
},
|
||||
}
|
||||
|
||||
target := newTestTarget("example.com:80", 10*time.Millisecond, nil)
|
||||
target.scrapeConfig = cfg
|
||||
app := &nopAppender{}
|
||||
|
||||
cfg.HonorLabels = false
|
||||
wrapped := target.wrapReportingAppender(app)
|
||||
|
||||
rl, ok := wrapped.(ruleLabelsAppender)
|
||||
if !ok {
|
||||
t.Fatalf("Expected ruleLabelsAppender but got %T", wrapped)
|
||||
}
|
||||
if rl.SampleAppender != app {
|
||||
t.Fatalf("Expected base appender but got %T", rl.SampleAppender)
|
||||
}
|
||||
|
||||
cfg.HonorLabels = true
|
||||
wrapped = target.wrapReportingAppender(app)
|
||||
|
||||
hl, ok := wrapped.(ruleLabelsAppender)
|
||||
if !ok {
|
||||
t.Fatalf("Expected ruleLabelsAppender but got %T", wrapped)
|
||||
}
|
||||
if hl.SampleAppender != app {
|
||||
t.Fatalf("Expected base appender but got %T", hl.SampleAppender)
|
||||
}
|
||||
}
|
||||
|
||||
func TestTargetWrapAppender(t *testing.T) {
|
||||
cfg := &config.ScrapeConfig{
|
||||
MetricRelabelConfigs: []*config.RelabelConfig{
|
||||
{}, {}, {},
|
||||
},
|
||||
}
|
||||
|
||||
target := newTestTarget("example.com:80", 10*time.Millisecond, nil)
|
||||
target.scrapeConfig = cfg
|
||||
app := &nopAppender{}
|
||||
|
||||
cfg.HonorLabels = false
|
||||
wrapped := target.wrapAppender(app)
|
||||
|
||||
rl, ok := wrapped.(ruleLabelsAppender)
|
||||
if !ok {
|
||||
t.Fatalf("Expected ruleLabelsAppender but got %T", wrapped)
|
||||
}
|
||||
re, ok := rl.SampleAppender.(relabelAppender)
|
||||
if !ok {
|
||||
t.Fatalf("Expected relabelAppender but got %T", rl.SampleAppender)
|
||||
}
|
||||
if re.SampleAppender != app {
|
||||
t.Fatalf("Expected base appender but got %T", re.SampleAppender)
|
||||
}
|
||||
|
||||
cfg.HonorLabels = true
|
||||
wrapped = target.wrapAppender(app)
|
||||
|
||||
hl, ok := wrapped.(honorLabelsAppender)
|
||||
if !ok {
|
||||
t.Fatalf("Expected honorLabelsAppender but got %T", wrapped)
|
||||
}
|
||||
re, ok = hl.SampleAppender.(relabelAppender)
|
||||
if !ok {
|
||||
t.Fatalf("Expected relabelAppender but got %T", hl.SampleAppender)
|
||||
}
|
||||
if re.SampleAppender != app {
|
||||
t.Fatalf("Expected base appender but got %T", re.SampleAppender)
|
||||
}
|
||||
}
|
||||
|
||||
func TestOverwriteLabels(t *testing.T) {
|
||||
type test struct {
|
||||
metric string
|
||||
|
@ -293,12 +369,13 @@ func TestTargetScrapeMetricRelabelConfigs(t *testing.T) {
|
|||
}
|
||||
|
||||
func TestTargetRecordScrapeHealth(t *testing.T) {
|
||||
testTarget := newTestTarget("example.url:80", 0, model.LabelSet{model.JobLabel: "testjob"})
|
||||
var (
|
||||
testTarget = newTestTarget("example.url:80", 0, model.LabelSet{model.JobLabel: "testjob"})
|
||||
now = model.Now()
|
||||
appender = &collectResultAppender{}
|
||||
)
|
||||
|
||||
now := model.Now()
|
||||
appender := &collectResultAppender{}
|
||||
testTarget.status.setLastError(nil)
|
||||
recordScrapeHealth(appender, now.Time(), testTarget.Labels(), testTarget.status.Health(), 2*time.Second)
|
||||
testTarget.report(appender, now.Time(), 2*time.Second, nil)
|
||||
|
||||
result := appender.result
|
||||
|
||||
|
|
Loading…
Reference in New Issue