diff --git a/cmd/promtool/main.go b/cmd/promtool/main.go index de002a0b2..bcfcc2422 100644 --- a/cmd/promtool/main.go +++ b/cmd/promtool/main.go @@ -81,6 +81,7 @@ func main() { var ( httpRoundTripper = api.DefaultRoundTripper serverURL *url.URL + remoteWriteURL *url.URL httpConfigFilePath string ) @@ -178,6 +179,18 @@ func main() { queryLabelsEnd := queryLabelsCmd.Flag("end", "End time (RFC3339 or Unix timestamp).").String() queryLabelsMatch := queryLabelsCmd.Flag("match", "Series selector. Can be specified multiple times.").Strings() + pushCmd := app.Command("push", "Push to a Prometheus server.") + pushCmd.Flag("http.config.file", "HTTP client configuration file for promtool to connect to Prometheus.").PlaceHolder("").ExistingFileVar(&httpConfigFilePath) + pushMetricsCmd := pushCmd.Command("metrics", "Push metrics to a prometheus remote write (for testing purpose only).") + pushMetricsCmd.Arg("remote-write-url", "Prometheus remote write url to push metrics.").Required().URLVar(&remoteWriteURL) + metricFiles := pushMetricsCmd.Arg( + "metric-files", + "The metric files to push, default is read from standard input.", + ).ExistingFiles() + pushMetricsLabels := pushMetricsCmd.Flag("label", "Label to attach to metrics. Can be specified multiple times.").Default("job=promtool").StringMap() + pushMetricsTimeout := pushMetricsCmd.Flag("timeout", "The time to wait for pushing metrics.").Default("30s").Duration() + pushMetricsHeaders := pushMetricsCmd.Flag("header", "Prometheus remote write header.").StringMap() + testCmd := app.Command("test", "Unit testing.") testRulesCmd := testCmd.Command("rules", "Unit tests for rules.") testRulesFiles := testRulesCmd.Arg( @@ -301,6 +314,9 @@ func main() { case checkMetricsCmd.FullCommand(): os.Exit(CheckMetrics(*checkMetricsExtended)) + case pushMetricsCmd.FullCommand(): + os.Exit(PushMetrics(remoteWriteURL, httpRoundTripper, *pushMetricsHeaders, *pushMetricsTimeout, *pushMetricsLabels, *metricFiles...)) + case queryInstantCmd.FullCommand(): os.Exit(QueryInstant(serverURL, httpRoundTripper, *queryInstantExpr, *queryInstantTime, p)) diff --git a/cmd/promtool/metrics.go b/cmd/promtool/metrics.go new file mode 100644 index 000000000..2bc2237e2 --- /dev/null +++ b/cmd/promtool/metrics.go @@ -0,0 +1,138 @@ +// Copyright 2023 The Prometheus Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package main + +import ( + "bytes" + "context" + "fmt" + "io" + "net/http" + "net/url" + "os" + "time" + + "github.com/golang/snappy" + config_util "github.com/prometheus/common/config" + "github.com/prometheus/common/model" + + "github.com/prometheus/prometheus/storage/remote" + "github.com/prometheus/prometheus/util/fmtutil" +) + +// Push metrics to a prometheus remote write (for testing purpose only). +func PushMetrics(url *url.URL, roundTripper http.RoundTripper, headers map[string]string, timeout time.Duration, labels map[string]string, files ...string) int { + addressURL, err := url.Parse(url.String()) + if err != nil { + fmt.Fprintln(os.Stderr, err) + return failureExitCode + } + + // build remote write client + writeClient, err := remote.NewWriteClient("remote-write", &remote.ClientConfig{ + URL: &config_util.URL{URL: addressURL}, + Timeout: model.Duration(timeout), + }) + if err != nil { + fmt.Fprintln(os.Stderr, err) + return failureExitCode + } + + // set custom tls config from httpConfigFilePath + // set custom headers to every request + client, ok := writeClient.(*remote.Client) + if !ok { + fmt.Fprintln(os.Stderr, fmt.Errorf("unexpected type %T", writeClient)) + return failureExitCode + } + client.Client.Transport = &setHeadersTransport{ + RoundTripper: roundTripper, + headers: headers, + } + + var data []byte + var failed bool + + if len(files) == 0 { + data, err = io.ReadAll(os.Stdin) + if err != nil { + fmt.Fprintln(os.Stderr, " FAILED:", err) + return failureExitCode + } + fmt.Printf("Parsing standard input\n") + if parseAndPushMetrics(client, data, labels) { + fmt.Printf(" SUCCESS: metrics pushed to remote write.\n") + return successExitCode + } + return failureExitCode + } + + for _, file := range files { + data, err = os.ReadFile(file) + if err != nil { + fmt.Fprintln(os.Stderr, " FAILED:", err) + failed = true + continue + } + + fmt.Printf("Parsing metrics file %s\n", file) + if parseAndPushMetrics(client, data, labels) { + fmt.Printf(" SUCCESS: metrics file %s pushed to remote write.\n", file) + continue + } + failed = true + } + + if failed { + return failureExitCode + } + + return successExitCode +} + +func parseAndPushMetrics(client *remote.Client, data []byte, labels map[string]string) bool { + metricsData, err := fmtutil.MetricTextToWriteRequest(bytes.NewReader(data), labels) + if err != nil { + fmt.Fprintln(os.Stderr, " FAILED:", err) + return false + } + + raw, err := metricsData.Marshal() + if err != nil { + fmt.Fprintln(os.Stderr, " FAILED:", err) + return false + } + + // Encode the request body into snappy encoding. + compressed := snappy.Encode(nil, raw) + err = client.Store(context.Background(), compressed) + if err != nil { + fmt.Fprintln(os.Stderr, " FAILED:", err) + return false + } + + return true +} + +type setHeadersTransport struct { + http.RoundTripper + headers map[string]string +} + +func (s *setHeadersTransport) RoundTrip(req *http.Request) (*http.Response, error) { + for key, value := range s.headers { + req.Header.Set(key, value) + } + return s.RoundTripper.RoundTrip(req) +} diff --git a/docs/command-line/promtool.md b/docs/command-line/promtool.md index e149d374a..c36caaf61 100644 --- a/docs/command-line/promtool.md +++ b/docs/command-line/promtool.md @@ -27,6 +27,7 @@ Tooling for the Prometheus monitoring system. | check | Check the resources for validity. | | query | Run query against a Prometheus server. | | debug | Fetch debug information. | +| push | Push to a Prometheus server. | | test | Unit testing. | | tsdb | Run tsdb commands. | @@ -372,6 +373,48 @@ Fetch all debug information. +### `promtool push` + +Push to a Prometheus server. + + + +#### Flags + +| Flag | Description | +| --- | --- | +| --http.config.file | HTTP client configuration file for promtool to connect to Prometheus. | + + + + +##### `promtool push metrics` + +Push metrics to a prometheus remote write (for testing purpose only). + + + +###### Flags + +| Flag | Description | Default | +| --- | --- | --- | +| --label | Label to attach to metrics. Can be specified multiple times. | `job=promtool` | +| --timeout | The time to wait for pushing metrics. | `30s` | +| --header | Prometheus remote write header. | | + + + + +###### Arguments + +| Argument | Description | Required | +| --- | --- | --- | +| remote-write-url | Prometheus remote write url to push metrics. | Yes | +| metric-files | The metric files to push, default is read from standard input. | | + + + + ### `promtool test` Unit testing. diff --git a/util/fmtutil/format.go b/util/fmtutil/format.go new file mode 100644 index 000000000..9034a90fa --- /dev/null +++ b/util/fmtutil/format.go @@ -0,0 +1,203 @@ +// Copyright 2023 The Prometheus Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package fmtutil + +import ( + "errors" + "fmt" + "io" + "sort" + "time" + + dto "github.com/prometheus/client_model/go" + "github.com/prometheus/common/expfmt" + "github.com/prometheus/common/model" + + "github.com/prometheus/prometheus/prompb" +) + +const ( + sumStr = "_sum" + countStr = "_count" + bucketStr = "_bucket" +) + +var MetricMetadataTypeValue = map[string]int32{ + "UNKNOWN": 0, + "COUNTER": 1, + "GAUGE": 2, + "HISTOGRAM": 3, + "GAUGEHISTOGRAM": 4, + "SUMMARY": 5, + "INFO": 6, + "STATESET": 7, +} + +// MetricTextToWriteRequest consumes an io.Reader and return the data in write request format. +func MetricTextToWriteRequest(input io.Reader, labels map[string]string) (*prompb.WriteRequest, error) { + var parser expfmt.TextParser + mf, err := parser.TextToMetricFamilies(input) + if err != nil { + return nil, err + } + return MetricFamiliesToWriteRequest(mf, labels) +} + +// MetricFamiliesToWriteRequest convert metric family to a writerequest. +func MetricFamiliesToWriteRequest(mf map[string]*dto.MetricFamily, extraLabels map[string]string) (*prompb.WriteRequest, error) { + wr := &prompb.WriteRequest{} + + // build metric list + sortedMetricNames := make([]string, 0, len(mf)) + for metric := range mf { + sortedMetricNames = append(sortedMetricNames, metric) + } + // sort metrics name in lexicographical order + sort.Strings(sortedMetricNames) + + for _, metricName := range sortedMetricNames { + // Set metadata writerequest + mtype := MetricMetadataTypeValue[mf[metricName].Type.String()] + metadata := prompb.MetricMetadata{ + MetricFamilyName: mf[metricName].GetName(), + Type: prompb.MetricMetadata_MetricType(mtype), + Help: mf[metricName].GetHelp(), + } + wr.Metadata = append(wr.Metadata, metadata) + + for _, metric := range mf[metricName].Metric { + labels := makeLabelsMap(metric, metricName, extraLabels) + if err := makeTimeseries(wr, labels, metric); err != nil { + return wr, err + } + } + } + return wr, nil +} + +func toTimeseries(wr *prompb.WriteRequest, labels map[string]string, timestamp int64, value float64) { + var ts prompb.TimeSeries + ts.Labels = makeLabels(labels) + ts.Samples = []prompb.Sample{ + { + Timestamp: timestamp, + Value: value, + }, + } + wr.Timeseries = append(wr.Timeseries, ts) +} + +func makeTimeseries(wr *prompb.WriteRequest, labels map[string]string, m *dto.Metric) error { + var err error + + timestamp := m.GetTimestampMs() + if timestamp == 0 { + timestamp = time.Now().UnixNano() / int64(time.Millisecond) + } + + switch { + case m.Gauge != nil: + toTimeseries(wr, labels, timestamp, m.GetGauge().GetValue()) + case m.Counter != nil: + toTimeseries(wr, labels, timestamp, m.GetCounter().GetValue()) + case m.Summary != nil: + metricName := labels[model.MetricNameLabel] + // Preserve metric name order with first quantile labels timeseries then sum suffix timeserie and finally count suffix timeserie + // Add Summary quantile timeseries + quantileLabels := make(map[string]string, len(labels)+1) + for key, value := range labels { + quantileLabels[key] = value + } + + for _, q := range m.GetSummary().Quantile { + quantileLabels[model.QuantileLabel] = fmt.Sprint(q.GetQuantile()) + toTimeseries(wr, quantileLabels, timestamp, q.GetValue()) + } + // Overwrite label model.MetricNameLabel for count and sum metrics + // Add Summary sum timeserie + labels[model.MetricNameLabel] = metricName + sumStr + toTimeseries(wr, labels, timestamp, m.GetSummary().GetSampleSum()) + // Add Summary count timeserie + labels[model.MetricNameLabel] = metricName + countStr + toTimeseries(wr, labels, timestamp, float64(m.GetSummary().GetSampleCount())) + + case m.Histogram != nil: + metricName := labels[model.MetricNameLabel] + // Preserve metric name order with first bucket suffix timeseries then sum suffix timeserie and finally count suffix timeserie + // Add Histogram bucket timeseries + bucketLabels := make(map[string]string, len(labels)+1) + for key, value := range labels { + bucketLabels[key] = value + } + for _, b := range m.GetHistogram().Bucket { + bucketLabels[model.MetricNameLabel] = metricName + bucketStr + bucketLabels[model.BucketLabel] = fmt.Sprint(b.GetUpperBound()) + toTimeseries(wr, bucketLabels, timestamp, float64(b.GetCumulativeCount())) + } + // Overwrite label model.MetricNameLabel for count and sum metrics + // Add Histogram sum timeserie + labels[model.MetricNameLabel] = metricName + sumStr + toTimeseries(wr, labels, timestamp, m.GetHistogram().GetSampleSum()) + // Add Histogram count timeserie + labels[model.MetricNameLabel] = metricName + countStr + toTimeseries(wr, labels, timestamp, float64(m.GetHistogram().GetSampleCount())) + + case m.Untyped != nil: + toTimeseries(wr, labels, timestamp, m.GetUntyped().GetValue()) + default: + err = errors.New("unsupported metric type") + } + return err +} + +func makeLabels(labelsMap map[string]string) []prompb.Label { + // build labels name list + sortedLabelNames := make([]string, 0, len(labelsMap)) + for label := range labelsMap { + sortedLabelNames = append(sortedLabelNames, label) + } + // sort labels name in lexicographical order + sort.Strings(sortedLabelNames) + + var labels []prompb.Label + for _, label := range sortedLabelNames { + labels = append(labels, prompb.Label{ + Name: label, + Value: labelsMap[label], + }) + } + return labels +} + +func makeLabelsMap(m *dto.Metric, metricName string, extraLabels map[string]string) map[string]string { + // build labels map + labels := make(map[string]string, len(m.Label)+len(extraLabels)) + labels[model.MetricNameLabel] = metricName + + // add extra labels + for key, value := range extraLabels { + labels[key] = value + } + + // add metric labels + for _, label := range m.Label { + labelname := label.GetName() + if labelname == model.JobLabel { + labelname = fmt.Sprintf("%s%s", model.ExportedLabelPrefix, labelname) + } + labels[labelname] = label.GetValue() + } + + return labels +} diff --git a/util/fmtutil/format_test.go b/util/fmtutil/format_test.go new file mode 100644 index 000000000..0f052f5e7 --- /dev/null +++ b/util/fmtutil/format_test.go @@ -0,0 +1,233 @@ +// Copyright 2023 The Prometheus Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package fmtutil + +import ( + "bytes" + "testing" + + "github.com/stretchr/testify/require" + + "github.com/prometheus/prometheus/prompb" +) + +var writeRequestFixture = &prompb.WriteRequest{ + Metadata: []prompb.MetricMetadata{ + { + MetricFamilyName: "http_request_duration_seconds", + Type: 3, + Help: "A histogram of the request duration.", + }, + { + MetricFamilyName: "http_requests_total", + Type: 1, + Help: "The total number of HTTP requests.", + }, + { + MetricFamilyName: "rpc_duration_seconds", + Type: 5, + Help: "A summary of the RPC duration in seconds.", + }, + { + MetricFamilyName: "test_metric1", + Type: 2, + Help: "This is a test metric.", + }, + }, + Timeseries: []prompb.TimeSeries{ + { + Labels: []prompb.Label{ + {Name: "__name__", Value: "http_request_duration_seconds_bucket"}, + {Name: "job", Value: "promtool"}, + {Name: "le", Value: "0.1"}, + }, + Samples: []prompb.Sample{{Value: 33444, Timestamp: 1}}, + }, + { + Labels: []prompb.Label{ + {Name: "__name__", Value: "http_request_duration_seconds_bucket"}, + {Name: "job", Value: "promtool"}, + {Name: "le", Value: "0.5"}, + }, + Samples: []prompb.Sample{{Value: 129389, Timestamp: 1}}, + }, + { + Labels: []prompb.Label{ + {Name: "__name__", Value: "http_request_duration_seconds_bucket"}, + {Name: "job", Value: "promtool"}, + {Name: "le", Value: "1"}, + }, + Samples: []prompb.Sample{{Value: 133988, Timestamp: 1}}, + }, + { + Labels: []prompb.Label{ + {Name: "__name__", Value: "http_request_duration_seconds_bucket"}, + {Name: "job", Value: "promtool"}, + {Name: "le", Value: "+Inf"}, + }, + Samples: []prompb.Sample{{Value: 144320, Timestamp: 1}}, + }, + { + Labels: []prompb.Label{ + {Name: "__name__", Value: "http_request_duration_seconds_sum"}, + {Name: "job", Value: "promtool"}, + }, + Samples: []prompb.Sample{{Value: 53423, Timestamp: 1}}, + }, + { + Labels: []prompb.Label{ + {Name: "__name__", Value: "http_request_duration_seconds_count"}, + {Name: "job", Value: "promtool"}, + }, + Samples: []prompb.Sample{{Value: 144320, Timestamp: 1}}, + }, + { + Labels: []prompb.Label{ + {Name: "__name__", Value: "http_requests_total"}, + {Name: "code", Value: "200"}, + {Name: "job", Value: "promtool"}, + {Name: "method", Value: "post"}, + }, + Samples: []prompb.Sample{{Value: 1027, Timestamp: 1395066363000}}, + }, + { + Labels: []prompb.Label{ + {Name: "__name__", Value: "http_requests_total"}, + {Name: "code", Value: "400"}, + {Name: "job", Value: "promtool"}, + {Name: "method", Value: "post"}, + }, + Samples: []prompb.Sample{{Value: 3, Timestamp: 1395066363000}}, + }, + { + Labels: []prompb.Label{ + {Name: "__name__", Value: "rpc_duration_seconds"}, + {Name: "job", Value: "promtool"}, + {Name: "quantile", Value: "0.01"}, + }, + Samples: []prompb.Sample{{Value: 3102, Timestamp: 1}}, + }, + { + Labels: []prompb.Label{ + {Name: "__name__", Value: "rpc_duration_seconds"}, + {Name: "job", Value: "promtool"}, + {Name: "quantile", Value: "0.5"}, + }, + Samples: []prompb.Sample{{Value: 4773, Timestamp: 1}}, + }, + { + Labels: []prompb.Label{ + {Name: "__name__", Value: "rpc_duration_seconds"}, + {Name: "job", Value: "promtool"}, + {Name: "quantile", Value: "0.99"}, + }, + Samples: []prompb.Sample{{Value: 76656, Timestamp: 1}}, + }, + { + Labels: []prompb.Label{ + {Name: "__name__", Value: "rpc_duration_seconds_sum"}, + {Name: "job", Value: "promtool"}, + }, + Samples: []prompb.Sample{{Value: 1.7560473e+07, Timestamp: 1}}, + }, + { + Labels: []prompb.Label{ + {Name: "__name__", Value: "rpc_duration_seconds_count"}, + {Name: "job", Value: "promtool"}, + }, + Samples: []prompb.Sample{{Value: 2693, Timestamp: 1}}, + }, + { + Labels: []prompb.Label{ + {Name: "__name__", Value: "test_metric1"}, + {Name: "b", Value: "c"}, + {Name: "baz", Value: "qux"}, + {Name: "d", Value: "e"}, + {Name: "foo", Value: "bar"}, + {Name: "job", Value: "promtool"}, + }, + Samples: []prompb.Sample{{Value: 1, Timestamp: 1}}, + }, + { + Labels: []prompb.Label{ + {Name: "__name__", Value: "test_metric1"}, + {Name: "b", Value: "c"}, + {Name: "baz", Value: "qux"}, + {Name: "d", Value: "e"}, + {Name: "foo", Value: "bar"}, + {Name: "job", Value: "promtool"}, + }, + Samples: []prompb.Sample{{Value: 2, Timestamp: 1}}, + }, + }, +} + +func TestParseAndPushMetricsTextAndFormat(t *testing.T) { + input := bytes.NewReader([]byte(` + # HELP http_request_duration_seconds A histogram of the request duration. + # TYPE http_request_duration_seconds histogram + http_request_duration_seconds_bucket{le="0.1"} 33444 1 + http_request_duration_seconds_bucket{le="0.5"} 129389 1 + http_request_duration_seconds_bucket{le="1"} 133988 1 + http_request_duration_seconds_bucket{le="+Inf"} 144320 1 + http_request_duration_seconds_sum 53423 1 + http_request_duration_seconds_count 144320 1 + # HELP http_requests_total The total number of HTTP requests. + # TYPE http_requests_total counter + http_requests_total{method="post",code="200"} 1027 1395066363000 + http_requests_total{method="post",code="400"} 3 1395066363000 + # HELP rpc_duration_seconds A summary of the RPC duration in seconds. + # TYPE rpc_duration_seconds summary + rpc_duration_seconds{quantile="0.01"} 3102 1 + rpc_duration_seconds{quantile="0.5"} 4773 1 + rpc_duration_seconds{quantile="0.99"} 76656 1 + rpc_duration_seconds_sum 1.7560473e+07 1 + rpc_duration_seconds_count 2693 1 + # HELP test_metric1 This is a test metric. + # TYPE test_metric1 gauge + test_metric1{b="c",baz="qux",d="e",foo="bar"} 1 1 + test_metric1{b="c",baz="qux",d="e",foo="bar"} 2 1 + `)) + labels := map[string]string{"job": "promtool"} + + expected, err := MetricTextToWriteRequest(input, labels) + require.NoError(t, err) + + require.Equal(t, writeRequestFixture, expected) +} + +func TestMetricTextToWriteRequestErrorParsingFloatValue(t *testing.T) { + input := bytes.NewReader([]byte(` + # HELP http_requests_total The total number of HTTP requests. + # TYPE http_requests_total counter + http_requests_total{method="post",code="200"} 1027Error 1395066363000 + http_requests_total{method="post",code="400"} 3 1395066363000 + `)) + labels := map[string]string{"job": "promtool"} + + _, err := MetricTextToWriteRequest(input, labels) + require.Equal(t, err.Error(), "text format parsing error in line 4: expected float as value, got \"1027Error\"") +} + +func TestMetricTextToWriteRequestErrorParsingMetricType(t *testing.T) { + input := bytes.NewReader([]byte(` + # HELP node_info node info summary. + # TYPE node_info info + node_info{test="summary"} 1 1395066363000 + `)) + labels := map[string]string{"job": "promtool"} + + _, err := MetricTextToWriteRequest(input, labels) + require.Equal(t, err.Error(), "text format parsing error in line 3: unknown metric type \"info\"") +}