diff --git a/model/labelname.go b/model/labelname.go index 3b4d3a626..0590fafe6 100644 --- a/model/labelname.go +++ b/model/labelname.go @@ -20,6 +20,9 @@ const ( JobLabel = LabelName("job") // The label name indicating the instance from which a timeseries was scraped. InstanceLabel = LabelName("instance") + // The label name prefix to prepend if a synthetic label is already present + // in the exported metrics. + ExporterLabelPrefix = LabelName("exporter_") // The metric name for the synthetic health variable. ScrapeHealthMetricName = LabelValue("up") // The metric name for synthetic alert timeseries. diff --git a/retrieval/format/fixtures/empty.json b/retrieval/format/fixtures/empty.json new file mode 100644 index 000000000..e69de29bb diff --git a/retrieval/format/fixtures/test0_0_1-0_0_2.json b/retrieval/format/fixtures/test0_0_1-0_0_2.json new file mode 100644 index 000000000..d14297cae --- /dev/null +++ b/retrieval/format/fixtures/test0_0_1-0_0_2.json @@ -0,0 +1,79 @@ +[ + { + "baseLabels": { + "name": "rpc_calls_total", + "job": "batch_job" + }, + "docstring": "RPC calls.", + "metric": { + "type": "counter", + "value": [ + { + "labels": { + "service": "zed" + }, + "value": 25 + }, + { + "labels": { + "service": "bar" + }, + "value": 25 + }, + { + "labels": { + "service": "foo" + }, + "value": 25 + } + ] + } + }, + { + "baseLabels": { + "name": "rpc_latency_microseconds" + }, + "docstring": "RPC latency.", + "metric": { + "type": "histogram", + "value": [ + { + "labels": { + "service": "foo" + }, + "value": { + "0.010000": 15.890724674774395, + "0.050000": 15.890724674774395, + "0.500000": 84.63044031436561, + "0.900000": 160.21100853053224, + "0.990000": 172.49828748957728 + } + }, + { + "labels": { + "service": "zed" + }, + "value": { + "0.010000": 0.0459814091918713, + "0.050000": 0.0459814091918713, + "0.500000": 0.6120456642749681, + "0.900000": 1.355915069887731, + "0.990000": 1.772733213161236 + } + }, + { + "labels": { + "service": "bar" + }, + "value": { + "0.010000": 78.48563317257356, + "0.050000": 78.48563317257356, + "0.500000": 97.31798360385088, + "0.900000": 109.89202084295582, + "0.990000": 109.99626121011262 + } + } + ] + } + } +] diff --git a/retrieval/format/processor.go b/retrieval/format/processor.go index 86c28de32..29acb4245 100644 --- a/retrieval/format/processor.go +++ b/retrieval/format/processor.go @@ -47,3 +47,24 @@ func LabelSet(labels map[string]string) model.LabelSet { return labelset } + +// Helper function to merge a target's base labels ontop of the labels of an +// exported sample. If a label is already defined in the exported sample, we +// assume that we are scraping an intermediate exporter and attach +// "exporter_"-prefixes to Prometheus' own base labels. +func mergeTargetLabels(entityLabels, targetLabels model.LabelSet) model.LabelSet { + result := model.LabelSet{} + + for label, value := range entityLabels { + result[label] = value + } + + for label, labelValue := range targetLabels { + if _, exists := result[label]; exists { + result[model.ExporterLabelPrefix+label] = labelValue + } else { + result[label] = labelValue + } + } + return result +} diff --git a/retrieval/format/processor0_0_1.go b/retrieval/format/processor0_0_1.go index 9573a3633..e363548e6 100644 --- a/retrieval/format/processor0_0_1.go +++ b/retrieval/format/processor0_0_1.go @@ -77,18 +77,8 @@ func (p *processor001) Process(stream io.ReadCloser, timestamp time.Time, baseLa pendingSamples := model.Samples{} for _, entity := range entities { for _, value := range entity.Metric.Value { - metric := model.Metric{} - for label, labelValue := range baseLabels { - metric[label] = labelValue - } - - for label, labelValue := range entity.BaseLabels { - metric[model.LabelName(label)] = model.LabelValue(labelValue) - } - - for label, labelValue := range value.Labels { - metric[model.LabelName(label)] = model.LabelValue(labelValue) - } + entityLabels := LabelSet(entity.BaseLabels).Merge(LabelSet(value.Labels)) + labels := mergeTargetLabels(entityLabels, baseLabels) switch entity.Metric.MetricType { case gauge001, counter001: @@ -100,7 +90,7 @@ func (p *processor001) Process(stream io.ReadCloser, timestamp time.Time, baseLa } pendingSamples = append(pendingSamples, model.Sample{ - Metric: metric, + Metric: model.Metric(labels), Timestamp: timestamp, Value: model.SampleValue(sampleValue), }) @@ -123,16 +113,16 @@ func (p *processor001) Process(stream io.ReadCloser, timestamp time.Time, baseLa continue } - childMetric := make(map[model.LabelName]model.LabelValue, len(metric)+1) + childMetric := make(map[model.LabelName]model.LabelValue, len(labels)+1) - for k, v := range metric { + for k, v := range labels { childMetric[k] = v } childMetric[model.LabelName(percentile001)] = model.LabelValue(percentile) pendingSamples = append(pendingSamples, model.Sample{ - Metric: childMetric, + Metric: model.Metric(childMetric), Timestamp: timestamp, Value: model.SampleValue(individualValue), }) diff --git a/retrieval/format/processor0_0_1_test.go b/retrieval/format/processor0_0_1_test.go index 914bc39ec..7ce74bb6f 100644 --- a/retrieval/format/processor0_0_1_test.go +++ b/retrieval/format/processor0_0_1_test.go @@ -18,99 +18,104 @@ import ( "fmt" "github.com/prometheus/prometheus/model" "github.com/prometheus/prometheus/utility/test" - "io/ioutil" - "strings" + "os" + "path" "testing" "time" ) func testProcessor001Process(t test.Tester) { var scenarios = []struct { - in string - out model.Samples - err error + in string + baseLabels model.LabelSet + out model.Samples + err error }{ { + in: "empty.json", err: fmt.Errorf("unexpected end of JSON input"), }, { - in: `[{"baseLabels":{"name":"rpc_calls_total"},"docstring":"RPC calls.","metric":{"type":"counter","value":[{"labels":{"service":"zed"},"value":25},{"labels":{"service":"bar"},"value":25},{"labels":{"service":"foo"},"value":25}]}},{"baseLabels":{"name":"rpc_latency_microseconds"},"docstring":"RPC latency.","metric":{"type":"histogram","value":[{"labels":{"service":"foo"},"value":{"0.010000":15.890724674774395,"0.050000":15.890724674774395,"0.500000":84.63044031436561,"0.900000":160.21100853053224,"0.990000":172.49828748957728}},{"labels":{"service":"zed"},"value":{"0.010000":0.0459814091918713,"0.050000":0.0459814091918713,"0.500000":0.6120456642749681,"0.900000":1.355915069887731,"0.990000":1.772733213161236}},{"labels":{"service":"bar"},"value":{"0.010000":78.48563317257356,"0.050000":78.48563317257356,"0.500000":97.31798360385088,"0.900000":109.89202084295582,"0.990000":109.99626121011262}}]}}]`, + in: "test0_0_1-0_0_2.json", + baseLabels: model.LabelSet{ + model.JobLabel: "batch_exporter", + }, out: model.Samples{ model.Sample{ - Metric: model.Metric{"service": "zed", model.MetricNameLabel: "rpc_calls_total"}, + Metric: model.Metric{"service": "zed", model.MetricNameLabel: "rpc_calls_total", "job": "batch_job", "exporter_job": "batch_exporter"}, Value: 25, }, model.Sample{ - Metric: model.Metric{"service": "bar", model.MetricNameLabel: "rpc_calls_total"}, + Metric: model.Metric{"service": "bar", model.MetricNameLabel: "rpc_calls_total", "job": "batch_job", "exporter_job": "batch_exporter"}, Value: 25, }, model.Sample{ - Metric: model.Metric{"service": "foo", model.MetricNameLabel: "rpc_calls_total"}, + Metric: model.Metric{"service": "foo", model.MetricNameLabel: "rpc_calls_total", "job": "batch_job", "exporter_job": "batch_exporter"}, Value: 25, }, model.Sample{ - Metric: model.Metric{"percentile": "0.010000", model.MetricNameLabel: "rpc_latency_microseconds", "service": "zed"}, + Metric: model.Metric{"percentile": "0.010000", model.MetricNameLabel: "rpc_latency_microseconds", "service": "zed", "job": "batch_exporter"}, Value: 0.0459814091918713, }, model.Sample{ - Metric: model.Metric{"percentile": "0.010000", model.MetricNameLabel: "rpc_latency_microseconds", "service": "bar"}, + Metric: model.Metric{"percentile": "0.010000", model.MetricNameLabel: "rpc_latency_microseconds", "service": "bar", "job": "batch_exporter"}, Value: 78.48563317257356, }, model.Sample{ - Metric: model.Metric{"percentile": "0.010000", model.MetricNameLabel: "rpc_latency_microseconds", "service": "foo"}, + Metric: model.Metric{"percentile": "0.010000", model.MetricNameLabel: "rpc_latency_microseconds", "service": "foo", "job": "batch_exporter"}, Value: 15.890724674774395, }, model.Sample{ - Metric: model.Metric{"percentile": "0.050000", model.MetricNameLabel: "rpc_latency_microseconds", "service": "zed"}, + Metric: model.Metric{"percentile": "0.050000", model.MetricNameLabel: "rpc_latency_microseconds", "service": "zed", "job": "batch_exporter"}, Value: 0.0459814091918713, }, model.Sample{ - Metric: model.Metric{"percentile": "0.050000", model.MetricNameLabel: "rpc_latency_microseconds", "service": "bar"}, + Metric: model.Metric{"percentile": "0.050000", model.MetricNameLabel: "rpc_latency_microseconds", "service": "bar", "job": "batch_exporter"}, Value: 78.48563317257356, }, model.Sample{ - Metric: model.Metric{"percentile": "0.050000", model.MetricNameLabel: "rpc_latency_microseconds", "service": "foo"}, + Metric: model.Metric{"percentile": "0.050000", model.MetricNameLabel: "rpc_latency_microseconds", "service": "foo", "job": "batch_exporter"}, Value: 15.890724674774395, }, model.Sample{ - Metric: model.Metric{"percentile": "0.500000", model.MetricNameLabel: "rpc_latency_microseconds", "service": "zed"}, + Metric: model.Metric{"percentile": "0.500000", model.MetricNameLabel: "rpc_latency_microseconds", "service": "zed", "job": "batch_exporter"}, Value: 0.6120456642749681, }, model.Sample{ - Metric: model.Metric{"percentile": "0.500000", model.MetricNameLabel: "rpc_latency_microseconds", "service": "bar"}, + Metric: model.Metric{"percentile": "0.500000", model.MetricNameLabel: "rpc_latency_microseconds", "service": "bar", "job": "batch_exporter"}, Value: 97.31798360385088, }, model.Sample{ - Metric: model.Metric{"percentile": "0.500000", model.MetricNameLabel: "rpc_latency_microseconds", "service": "foo"}, + Metric: model.Metric{"percentile": "0.500000", model.MetricNameLabel: "rpc_latency_microseconds", "service": "foo", "job": "batch_exporter"}, Value: 84.63044031436561, }, model.Sample{ - Metric: model.Metric{"percentile": "0.900000", model.MetricNameLabel: "rpc_latency_microseconds", "service": "zed"}, + Metric: model.Metric{"percentile": "0.900000", model.MetricNameLabel: "rpc_latency_microseconds", "service": "zed", "job": "batch_exporter"}, Value: 1.355915069887731, }, model.Sample{ - Metric: model.Metric{"percentile": "0.900000", model.MetricNameLabel: "rpc_latency_microseconds", "service": "bar"}, + Metric: model.Metric{"percentile": "0.900000", model.MetricNameLabel: "rpc_latency_microseconds", "service": "bar", "job": "batch_exporter"}, Value: 109.89202084295582, }, model.Sample{ - Metric: model.Metric{"percentile": "0.900000", model.MetricNameLabel: "rpc_latency_microseconds", "service": "foo"}, + Metric: model.Metric{"percentile": "0.900000", model.MetricNameLabel: "rpc_latency_microseconds", "service": "foo", "job": "batch_exporter"}, Value: 160.21100853053224, }, model.Sample{ - Metric: model.Metric{"percentile": "0.990000", model.MetricNameLabel: "rpc_latency_microseconds", "service": "zed"}, + Metric: model.Metric{"percentile": "0.990000", model.MetricNameLabel: "rpc_latency_microseconds", "service": "zed", "job": "batch_exporter"}, Value: 1.772733213161236, }, model.Sample{ - Metric: model.Metric{"percentile": "0.990000", model.MetricNameLabel: "rpc_latency_microseconds", "service": "bar"}, + Metric: model.Metric{"percentile": "0.990000", model.MetricNameLabel: "rpc_latency_microseconds", "service": "bar", "job": "batch_exporter"}, Value: 109.99626121011262, }, model.Sample{ - Metric: model.Metric{"percentile": "0.990000", model.MetricNameLabel: "rpc_latency_microseconds", "service": "foo"}, + Metric: model.Metric{"percentile": "0.990000", model.MetricNameLabel: "rpc_latency_microseconds", "service": "foo", "job": "batch_exporter"}, Value: 172.49828748957728, }, }, @@ -124,9 +129,12 @@ func testProcessor001Process(t test.Tester) { close(c) }(inputChannel) - reader := strings.NewReader(scenario.in) + reader, err := os.Open(path.Join("fixtures", scenario.in)) + if err != nil { + t.Fatalf("%d. couldn't open scenario input file %s: %s", scenario.in, err) + } - err := Processor001.Process(ioutil.NopCloser(reader), time.Now(), model.LabelSet{}, inputChannel) + err = Processor001.Process(reader, time.Now(), scenario.baseLabels, inputChannel) if !test.ErrorEqual(scenario.err, err) { t.Errorf("%d. expected err of %s, got %s", i, scenario.err, err) continue diff --git a/retrieval/format/processor0_0_2.go b/retrieval/format/processor0_0_2.go index c8e279099..4bbb22535 100644 --- a/retrieval/format/processor0_0_2.go +++ b/retrieval/format/processor0_0_2.go @@ -53,8 +53,6 @@ var Processor002 ProcessorFunc = func(stream io.ReadCloser, timestamp time.Time, pendingSamples := model.Samples{} for _, entity := range entities { - entityLabels := baseLabels.Merge(LabelSet(entity.BaseLabels)) - switch entity.Metric.Type { case "counter", "gauge": var values []counter @@ -67,7 +65,8 @@ var Processor002 ProcessorFunc = func(stream io.ReadCloser, timestamp time.Time, } for _, counter := range values { - labels := entityLabels.Merge(LabelSet(counter.Labels)) + entityLabels := LabelSet(entity.BaseLabels).Merge(LabelSet(counter.Labels)) + labels := mergeTargetLabels(entityLabels, baseLabels) pendingSamples = append(pendingSamples, model.Sample{ Metric: model.Metric(labels), @@ -88,8 +87,9 @@ var Processor002 ProcessorFunc = func(stream io.ReadCloser, timestamp time.Time, for _, histogram := range values { for percentile, value := range histogram.Values { - labels := entityLabels.Merge(LabelSet(histogram.Labels)) - labels[model.LabelName("percentile")] = model.LabelValue(percentile) + entityLabels := LabelSet(entity.BaseLabels).Merge(LabelSet(histogram.Labels)) + entityLabels[model.LabelName("percentile")] = model.LabelValue(percentile) + labels := mergeTargetLabels(entityLabels, baseLabels) pendingSamples = append(pendingSamples, model.Sample{ Metric: model.Metric(labels), diff --git a/retrieval/format/processor0_0_2_test.go b/retrieval/format/processor0_0_2_test.go index 7d6a2d532..da0bce660 100644 --- a/retrieval/format/processor0_0_2_test.go +++ b/retrieval/format/processor0_0_2_test.go @@ -18,99 +18,104 @@ import ( "fmt" "github.com/prometheus/prometheus/model" "github.com/prometheus/prometheus/utility/test" - "io/ioutil" - "strings" + "os" + "path" "testing" "time" ) func testProcessor002Process(t test.Tester) { var scenarios = []struct { - in string - out model.Samples - err error + in string + baseLabels model.LabelSet + out model.Samples + err error }{ { + in: "empty.json", err: fmt.Errorf("EOF"), }, { - in: `[{"baseLabels":{"name":"rpc_calls_total"},"docstring":"RPC calls.","metric":{"type":"counter","value":[{"labels":{"service":"zed"},"value":25},{"labels":{"service":"bar"},"value":25},{"labels":{"service":"foo"},"value":25}]}},{"baseLabels":{"name":"rpc_latency_microseconds"},"docstring":"RPC latency.","metric":{"type":"histogram","value":[{"labels":{"service":"foo"},"value":{"0.010000":15.890724674774395,"0.050000":15.890724674774395,"0.500000":84.63044031436561,"0.900000":160.21100853053224,"0.990000":172.49828748957728}},{"labels":{"service":"zed"},"value":{"0.010000":0.0459814091918713,"0.050000":0.0459814091918713,"0.500000":0.6120456642749681,"0.900000":1.355915069887731,"0.990000":1.772733213161236}},{"labels":{"service":"bar"},"value":{"0.010000":78.48563317257356,"0.050000":78.48563317257356,"0.500000":97.31798360385088,"0.900000":109.89202084295582,"0.990000":109.99626121011262}}]}}]`, + in: "test0_0_1-0_0_2.json", + baseLabels: model.LabelSet{ + model.JobLabel: "batch_exporter", + }, out: model.Samples{ model.Sample{ - Metric: model.Metric{"service": "zed", model.MetricNameLabel: "rpc_calls_total"}, + Metric: model.Metric{"service": "zed", model.MetricNameLabel: "rpc_calls_total", "job": "batch_job", "exporter_job": "batch_exporter"}, Value: 25, }, model.Sample{ - Metric: model.Metric{"service": "bar", model.MetricNameLabel: "rpc_calls_total"}, + Metric: model.Metric{"service": "bar", model.MetricNameLabel: "rpc_calls_total", "job": "batch_job", "exporter_job": "batch_exporter"}, Value: 25, }, model.Sample{ - Metric: model.Metric{"service": "foo", model.MetricNameLabel: "rpc_calls_total"}, + Metric: model.Metric{"service": "foo", model.MetricNameLabel: "rpc_calls_total", "job": "batch_job", "exporter_job": "batch_exporter"}, Value: 25, }, model.Sample{ - Metric: model.Metric{"percentile": "0.010000", model.MetricNameLabel: "rpc_latency_microseconds", "service": "zed"}, + Metric: model.Metric{"percentile": "0.010000", model.MetricNameLabel: "rpc_latency_microseconds", "service": "zed", "job": "batch_exporter"}, Value: 0.0459814091918713, }, model.Sample{ - Metric: model.Metric{"percentile": "0.010000", model.MetricNameLabel: "rpc_latency_microseconds", "service": "bar"}, + Metric: model.Metric{"percentile": "0.010000", model.MetricNameLabel: "rpc_latency_microseconds", "service": "bar", "job": "batch_exporter"}, Value: 78.48563317257356, }, model.Sample{ - Metric: model.Metric{"percentile": "0.010000", model.MetricNameLabel: "rpc_latency_microseconds", "service": "foo"}, + Metric: model.Metric{"percentile": "0.010000", model.MetricNameLabel: "rpc_latency_microseconds", "service": "foo", "job": "batch_exporter"}, Value: 15.890724674774395, }, model.Sample{ - Metric: model.Metric{"percentile": "0.050000", model.MetricNameLabel: "rpc_latency_microseconds", "service": "zed"}, + Metric: model.Metric{"percentile": "0.050000", model.MetricNameLabel: "rpc_latency_microseconds", "service": "zed", "job": "batch_exporter"}, Value: 0.0459814091918713, }, model.Sample{ - Metric: model.Metric{"percentile": "0.050000", model.MetricNameLabel: "rpc_latency_microseconds", "service": "bar"}, + Metric: model.Metric{"percentile": "0.050000", model.MetricNameLabel: "rpc_latency_microseconds", "service": "bar", "job": "batch_exporter"}, Value: 78.48563317257356, }, model.Sample{ - Metric: model.Metric{"percentile": "0.050000", model.MetricNameLabel: "rpc_latency_microseconds", "service": "foo"}, + Metric: model.Metric{"percentile": "0.050000", model.MetricNameLabel: "rpc_latency_microseconds", "service": "foo", "job": "batch_exporter"}, Value: 15.890724674774395, }, model.Sample{ - Metric: model.Metric{"percentile": "0.500000", model.MetricNameLabel: "rpc_latency_microseconds", "service": "zed"}, + Metric: model.Metric{"percentile": "0.500000", model.MetricNameLabel: "rpc_latency_microseconds", "service": "zed", "job": "batch_exporter"}, Value: 0.6120456642749681, }, model.Sample{ - Metric: model.Metric{"percentile": "0.500000", model.MetricNameLabel: "rpc_latency_microseconds", "service": "bar"}, + Metric: model.Metric{"percentile": "0.500000", model.MetricNameLabel: "rpc_latency_microseconds", "service": "bar", "job": "batch_exporter"}, Value: 97.31798360385088, }, model.Sample{ - Metric: model.Metric{"percentile": "0.500000", model.MetricNameLabel: "rpc_latency_microseconds", "service": "foo"}, + Metric: model.Metric{"percentile": "0.500000", model.MetricNameLabel: "rpc_latency_microseconds", "service": "foo", "job": "batch_exporter"}, Value: 84.63044031436561, }, model.Sample{ - Metric: model.Metric{"percentile": "0.900000", model.MetricNameLabel: "rpc_latency_microseconds", "service": "zed"}, + Metric: model.Metric{"percentile": "0.900000", model.MetricNameLabel: "rpc_latency_microseconds", "service": "zed", "job": "batch_exporter"}, Value: 1.355915069887731, }, model.Sample{ - Metric: model.Metric{"percentile": "0.900000", model.MetricNameLabel: "rpc_latency_microseconds", "service": "bar"}, + Metric: model.Metric{"percentile": "0.900000", model.MetricNameLabel: "rpc_latency_microseconds", "service": "bar", "job": "batch_exporter"}, Value: 109.89202084295582, }, model.Sample{ - Metric: model.Metric{"percentile": "0.900000", model.MetricNameLabel: "rpc_latency_microseconds", "service": "foo"}, + Metric: model.Metric{"percentile": "0.900000", model.MetricNameLabel: "rpc_latency_microseconds", "service": "foo", "job": "batch_exporter"}, Value: 160.21100853053224, }, model.Sample{ - Metric: model.Metric{"percentile": "0.990000", model.MetricNameLabel: "rpc_latency_microseconds", "service": "zed"}, + Metric: model.Metric{"percentile": "0.990000", model.MetricNameLabel: "rpc_latency_microseconds", "service": "zed", "job": "batch_exporter"}, Value: 1.772733213161236, }, model.Sample{ - Metric: model.Metric{"percentile": "0.990000", model.MetricNameLabel: "rpc_latency_microseconds", "service": "bar"}, + Metric: model.Metric{"percentile": "0.990000", model.MetricNameLabel: "rpc_latency_microseconds", "service": "bar", "job": "batch_exporter"}, Value: 109.99626121011262, }, model.Sample{ - Metric: model.Metric{"percentile": "0.990000", model.MetricNameLabel: "rpc_latency_microseconds", "service": "foo"}, + Metric: model.Metric{"percentile": "0.990000", model.MetricNameLabel: "rpc_latency_microseconds", "service": "foo", "job": "batch_exporter"}, Value: 172.49828748957728, }, }, @@ -124,9 +129,12 @@ func testProcessor002Process(t test.Tester) { close(c) }(inputChannel) - reader := strings.NewReader(scenario.in) + reader, err := os.Open(path.Join("fixtures", scenario.in)) + if err != nil { + t.Fatalf("%d. couldn't open scenario input file %s: %s", scenario.in, err) + } - err := Processor002.Process(ioutil.NopCloser(reader), time.Now(), model.LabelSet{}, inputChannel) + err = Processor002.Process(reader, time.Now(), scenario.baseLabels, inputChannel) if !test.ErrorEqual(scenario.err, err) { t.Errorf("%d. expected err of %s, got %s", i, scenario.err, err) continue