Browse Source

Add OTLP Ingestion endpoint (#12571)

* Add OTLP Ingestion endpoint

We copy files from the otel-collector-contrib. See the README in
`storage/remote/otlptranslator/README.md`.

This supersedes: https://github.com/prometheus/prometheus/pull/11965

Signed-off-by: gouthamve <gouthamve@gmail.com>

* Return a 200 OK

It is what the OTEL Golang SDK expect :(

https://github.com/open-telemetry/opentelemetry-go/issues/4363

Signed-off-by: Goutham <gouthamve@gmail.com>

---------

Signed-off-by: gouthamve <gouthamve@gmail.com>
Signed-off-by: Goutham <gouthamve@gmail.com>
pull/11847/merge
Goutham Veeramachaneni 1 year ago committed by GitHub
parent
commit
ad4f514e66
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 1
      .github/CODEOWNERS
  2. 3
      .golangci.yml
  3. 5
      cmd/prometheus/main.go
  4. 2
      docs/command-line/prometheus.md
  5. 3
      go.mod
  6. 9
      go.sum
  7. 64
      storage/remote/codec.go
  8. 23
      storage/remote/otlptranslator/README.md
  9. 41
      storage/remote/otlptranslator/prometheus/normalize_label.go
  10. 19
      storage/remote/otlptranslator/prometheus/normalize_label_test.go
  11. 251
      storage/remote/otlptranslator/prometheus/normalize_name.go
  12. 180
      storage/remote/otlptranslator/prometheus/normalize_name_test.go
  13. 34
      storage/remote/otlptranslator/prometheus/testutils_test.go
  14. 559
      storage/remote/otlptranslator/prometheusremotewrite/helper.go
  15. 158
      storage/remote/otlptranslator/prometheusremotewrite/histograms.go
  16. 114
      storage/remote/otlptranslator/prometheusremotewrite/metrics_to_prw.go
  17. 104
      storage/remote/otlptranslator/prometheusremotewrite/number_data_points.go
  18. 15
      storage/remote/otlptranslator/update-copy.sh
  19. 58
      storage/remote/write_handler.go
  20. 110
      storage/remote/write_test.go
  21. 21
      web/api/v1/api.go
  22. 2
      web/api/v1/errors_test.go
  23. 5
      web/web.go

1
.github/CODEOWNERS

@ -1,6 +1,7 @@
/web/ui @juliusv
/web/ui/module @juliusv @nexucis
/storage/remote @csmarchbanks @cstyan @bwplotka @tomwilkie
/storage/remote/otlptranslator @gouthamve @jesusvazquez
/discovery/kubernetes @brancz
/tsdb @jesusvazquez
/promql @roidelapluie

3
.golangci.yml

@ -3,6 +3,9 @@ run:
skip-files:
# Skip autogenerated files.
- ^.*\.(pb|y)\.go$
skip-dirs:
# Copied it from a different source
- storage/remote/otlptranslator/prometheusremotewrite
output:
sort-results: true

5
cmd/prometheus/main.go

@ -170,6 +170,9 @@ func (c *flagConfig) setFeatureListOptions(logger log.Logger) error {
case "remote-write-receiver":
c.web.EnableRemoteWriteReceiver = true
level.Warn(logger).Log("msg", "Remote write receiver enabled via feature flag remote-write-receiver. This is DEPRECATED. Use --web.enable-remote-write-receiver.")
case "otlp-write-receiver":
c.web.EnableOTLPWriteReceiver = true
level.Info(logger).Log("msg", "Experimental OTLP write receiver enabled")
case "expand-external-labels":
c.enableExpandExternalLabels = true
level.Info(logger).Log("msg", "Experimental expand-external-labels enabled")
@ -420,7 +423,7 @@ func main() {
a.Flag("scrape.discovery-reload-interval", "Interval used by scrape manager to throttle target groups updates.").
Hidden().Default("5s").SetValue(&cfg.scrape.DiscoveryReloadInterval)
a.Flag("enable-feature", "Comma separated feature names to enable. Valid options: agent, exemplar-storage, expand-external-labels, memory-snapshot-on-shutdown, promql-at-modifier, promql-negative-offset, promql-per-step-stats, remote-write-receiver (DEPRECATED), extra-scrape-metrics, new-service-discovery-manager, auto-gomaxprocs, no-default-scrape-port, native-histograms. See https://prometheus.io/docs/prometheus/latest/feature_flags/ for more details.").
a.Flag("enable-feature", "Comma separated feature names to enable. Valid options: agent, exemplar-storage, expand-external-labels, memory-snapshot-on-shutdown, promql-at-modifier, promql-negative-offset, promql-per-step-stats, remote-write-receiver (DEPRECATED), extra-scrape-metrics, new-service-discovery-manager, auto-gomaxprocs, no-default-scrape-port, native-histograms, otlp-write-receiver. See https://prometheus.io/docs/prometheus/latest/feature_flags/ for more details.").
Default("").StringsVar(&cfg.featureList)
promlogflag.AddFlags(a, &cfg.promlogConfig)

2
docs/command-line/prometheus.md

@ -52,7 +52,7 @@ The Prometheus monitoring server
| <code class="text-nowrap">--query.timeout</code> | Maximum time a query may take before being aborted. Use with server mode only. | `2m` |
| <code class="text-nowrap">--query.max-concurrency</code> | Maximum number of queries executed concurrently. Use with server mode only. | `20` |
| <code class="text-nowrap">--query.max-samples</code> | Maximum number of samples a single query can load into memory. Note that queries will fail if they try to load more samples than this into memory, so this also limits the number of samples a query can return. Use with server mode only. | `50000000` |
| <code class="text-nowrap">--enable-feature</code> | Comma separated feature names to enable. Valid options: agent, exemplar-storage, expand-external-labels, memory-snapshot-on-shutdown, promql-at-modifier, promql-negative-offset, promql-per-step-stats, remote-write-receiver (DEPRECATED), extra-scrape-metrics, new-service-discovery-manager, auto-gomaxprocs, no-default-scrape-port, native-histograms. See https://prometheus.io/docs/prometheus/latest/feature_flags/ for more details. | |
| <code class="text-nowrap">--enable-feature</code> | Comma separated feature names to enable. Valid options: agent, exemplar-storage, expand-external-labels, memory-snapshot-on-shutdown, promql-at-modifier, promql-negative-offset, promql-per-step-stats, remote-write-receiver (DEPRECATED), extra-scrape-metrics, new-service-discovery-manager, auto-gomaxprocs, no-default-scrape-port, native-histograms, otlp-write-receiver. See https://prometheus.io/docs/prometheus/latest/feature_flags/ for more details. | |
| <code class="text-nowrap">--log.level</code> | Only log messages with the given severity or above. One of: [debug, info, warn, error] | `info` |
| <code class="text-nowrap">--log.format</code> | Output format of log messages. One of: [logfmt, json] | `logfmt` |

3
go.mod

@ -54,6 +54,8 @@ require (
github.com/shurcooL/httpfs v0.0.0-20230704072500-f1e31cf0ba5c
github.com/stretchr/testify v1.8.4
github.com/vultr/govultr/v2 v2.17.2
go.opentelemetry.io/collector/pdata v0.66.0
go.opentelemetry.io/collector/semconv v0.81.0
go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.42.0
go.opentelemetry.io/otel v1.16.0
go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.16.0
@ -64,6 +66,7 @@ require (
go.uber.org/atomic v1.11.0
go.uber.org/automaxprocs v1.5.2
go.uber.org/goleak v1.2.1
go.uber.org/multierr v1.8.0
golang.org/x/net v0.12.0
golang.org/x/oauth2 v0.10.0
golang.org/x/sync v0.3.0

9
go.sum

@ -448,7 +448,7 @@ github.com/hashicorp/go-uuid v1.0.0/go.mod h1:6SBZvOh/SIDV7/2o3Jml5SYk/TvGqwFJ/b
github.com/hashicorp/go-uuid v1.0.1/go.mod h1:6SBZvOh/SIDV7/2o3Jml5SYk/TvGqwFJ/bN7x4byOro=
github.com/hashicorp/go-uuid v1.0.3 h1:2gKiV6YVmrJ1i2CKKa9obLvRieoRGviZFL26PcT/Co8=
github.com/hashicorp/go-version v1.2.0/go.mod h1:fltr4n8CU8Ke44wwGCBoEymUuxUHl09ZGVZPK5anwXA=
github.com/hashicorp/go-version v1.2.1 h1:zEfKbn2+PDgroKdiOzqiE8rsmLqU2uwi5PB5pBJ3TkI=
github.com/hashicorp/go-version v1.6.0 h1:feTTfFNnjP967rlCxM/I9g701jU+RN74YKx2mOkIeek=
github.com/hashicorp/go.net v0.0.1/go.mod h1:hjKkEWcCURg++eb33jQU7oqQcI9XDCnUzHA0oac0k90=
github.com/hashicorp/golang-lru v0.5.0/go.mod h1:/m3WP610KZHVQ1SGc6re/UDhFvYD7pJ4Ao+sR/qLZy8=
github.com/hashicorp/golang-lru v0.5.1/go.mod h1:/m3WP610KZHVQ1SGc6re/UDhFvYD7pJ4Ao+sR/qLZy8=
@ -797,6 +797,10 @@ go.opencensus.io v0.22.3/go.mod h1:yxeiOL68Rb0Xd1ddK5vPZ/oVn4vY4Ynel7k9FzqtOIw=
go.opencensus.io v0.22.4/go.mod h1:yxeiOL68Rb0Xd1ddK5vPZ/oVn4vY4Ynel7k9FzqtOIw=
go.opencensus.io v0.24.0 h1:y73uSU6J157QMP2kn2r30vwW1A2W2WFwSCGnAVxeaD0=
go.opencensus.io v0.24.0/go.mod h1:vNK8G9p7aAivkbmorf4v+7Hgx+Zs0yY+0fOtgBfjQKo=
go.opentelemetry.io/collector/pdata v0.66.0 h1:UdE5U6MsDNzuiWaXdjGx2lC3ElVqWmN/hiUE8vyvSuM=
go.opentelemetry.io/collector/pdata v0.66.0/go.mod h1:pqyaznLzk21m+1KL6fwOsRryRELL+zNM0qiVSn0MbVc=
go.opentelemetry.io/collector/semconv v0.81.0 h1:lCYNNo3powDvFIaTPP2jDKIrBiV1T92NK4QgL/aHYXw=
go.opentelemetry.io/collector/semconv v0.81.0/go.mod h1:TlYPtzvsXyHOgr5eATi43qEMqwSmIziivJB2uctKswo=
go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.42.0 h1:pginetY7+onl4qN1vl0xW/V/v6OBZ0vVdH+esuJgvmM=
go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.42.0/go.mod h1:XiYsayHc36K3EByOO6nbAXnAWbrUxdjUROCEeeROOH8=
go.opentelemetry.io/otel v1.16.0 h1:Z7GVAX/UkAXPKsy94IU+i6thsQS4nb7LviLpnaNeW8s=
@ -820,6 +824,7 @@ go.opentelemetry.io/proto/otlp v1.0.0 h1:T0TX0tmXU8a3CbNXzEKGeU5mIVOdf0oykP+u2lI
go.opentelemetry.io/proto/otlp v1.0.0/go.mod h1:Sy6pihPLfYHkr3NkUbEhGHFhINUSI/v80hjKIs5JXpM=
go.uber.org/atomic v1.3.2/go.mod h1:gD2HeocX3+yG+ygLZcrzQJaqmWj9AIm7n08wl/qW/PE=
go.uber.org/atomic v1.5.0/go.mod h1:sABNBOSYdrvTF6hTgEIbc7YasKWGhgEQZyfxyTvoXHQ=
go.uber.org/atomic v1.7.0/go.mod h1:fEN4uk6kAWBTFdckzkM89CLk9XfWZrxpCo0nPH17wJc=
go.uber.org/atomic v1.11.0 h1:ZvwS0R+56ePWxUNi+Atn9dWONBPp/AUETXlHW0DxSjE=
go.uber.org/atomic v1.11.0/go.mod h1:LUxbIzbOniOlMKjJjyPfpl4v+PKK2cNJn91OQbhoJI0=
go.uber.org/automaxprocs v1.5.2 h1:2LxUOGiR3O6tw8ui5sZa2LAaHnsviZdVOUZw4fvbnME=
@ -828,6 +833,8 @@ go.uber.org/goleak v1.2.1 h1:NBol2c7O1ZokfZ0LEU9K6Whx/KnwvepVetCUhtKja4A=
go.uber.org/goleak v1.2.1/go.mod h1:qlT2yGI9QafXHhZZLxlSuNsMw3FFLxBr+tBRlmO1xH4=
go.uber.org/multierr v1.1.0/go.mod h1:wR5kodmAFQ0UK8QlbwjlSNy0Z68gJhDJUG5sjR94q/0=
go.uber.org/multierr v1.3.0/go.mod h1:VgVr7evmIr6uPjLBxg28wmKNXyqE9akIJ5XnfpiKl+4=
go.uber.org/multierr v1.8.0 h1:dg6GjLku4EH+249NNmoIciG9N/jURbDG+pFlTkhzIC8=
go.uber.org/multierr v1.8.0/go.mod h1:7EAYxJLBy9rStEaz58O2t4Uvip6FSURkq8/ppBp95ak=
go.uber.org/tools v0.0.0-20190618225709-2cfd321de3ee/go.mod h1:vJERXedbb3MVM5f9Ejo0C68/HhF8uaILCdgjnY+goOA=
go.uber.org/zap v1.10.0/go.mod h1:vwi/ZaCAaUcBkycHslxD9B2zi4UTXhF60s6SWpuDF0Q=
go.uber.org/zap v1.13.0/go.mod h1:zwrFLgMcdUuIBviXEYEH1YKNaOBnKXsx2IPda5bBwHM=

64
storage/remote/codec.go

@ -14,6 +14,7 @@
package remote
import (
"compress/gzip"
"errors"
"fmt"
"io"
@ -26,6 +27,7 @@ import (
"github.com/gogo/protobuf/proto"
"github.com/golang/snappy"
"github.com/prometheus/common/model"
"go.opentelemetry.io/collector/pdata/pmetric/pmetricotlp"
"golang.org/x/exp/slices"
"github.com/prometheus/prometheus/model/exemplar"
@ -38,8 +40,13 @@ import (
"github.com/prometheus/prometheus/tsdb/chunks"
)
// decodeReadLimit is the maximum size of a read request body in bytes.
const decodeReadLimit = 32 * 1024 * 1024
const (
// decodeReadLimit is the maximum size of a read request body in bytes.
decodeReadLimit = 32 * 1024 * 1024
pbContentType = "application/x-protobuf"
jsonContentType = "application/json"
)
type HTTPError struct {
msg string
@ -806,3 +813,56 @@ func DecodeWriteRequest(r io.Reader) (*prompb.WriteRequest, error) {
return &req, nil
}
func DecodeOTLPWriteRequest(r *http.Request) (pmetricotlp.ExportRequest, error) {
contentType := r.Header.Get("Content-Type")
var decoderFunc func(buf []byte) (pmetricotlp.ExportRequest, error)
switch contentType {
case pbContentType:
decoderFunc = func(buf []byte) (pmetricotlp.ExportRequest, error) {
req := pmetricotlp.NewExportRequest()
return req, req.UnmarshalProto(buf)
}
case jsonContentType:
decoderFunc = func(buf []byte) (pmetricotlp.ExportRequest, error) {
req := pmetricotlp.NewExportRequest()
return req, req.UnmarshalJSON(buf)
}
default:
return pmetricotlp.NewExportRequest(), fmt.Errorf("unsupported content type: %s, supported: [%s, %s]", contentType, jsonContentType, pbContentType)
}
reader := r.Body
// Handle compression.
switch r.Header.Get("Content-Encoding") {
case "gzip":
gr, err := gzip.NewReader(reader)
if err != nil {
return pmetricotlp.NewExportRequest(), err
}
reader = gr
case "":
// No compression.
default:
return pmetricotlp.NewExportRequest(), fmt.Errorf("unsupported compression: %s. Only \"gzip\" or no compression supported", r.Header.Get("Content-Encoding"))
}
body, err := io.ReadAll(reader)
if err != nil {
r.Body.Close()
return pmetricotlp.NewExportRequest(), err
}
if err = r.Body.Close(); err != nil {
return pmetricotlp.NewExportRequest(), err
}
otlpReq, err := decoderFunc(body)
if err != nil {
return pmetricotlp.NewExportRequest(), err
}
return otlpReq, nil
}

23
storage/remote/otlptranslator/README.md

@ -0,0 +1,23 @@
## Copying from opentelemetry/opentelemetry-collector-contrib
This files in the `prometheus/` and `prometheusremotewrite/` are copied from the OpenTelemetry Project[^1].
This is done instead of adding a go.mod dependency because OpenTelemetry depends on `prometheus/prometheus` and a cyclic dependency will be created. This is just a temporary solution and the long-term solution is to move the required packages from OpenTelemetry into `prometheus/prometheus`.
We don't copy in `./prometheus` through this script because that package imports a collector specific featuregate package we don't want to import. The featuregate package is being removed now, and in the future we will copy this folder too.
To update the dependency is a multi-step process:
1. Vendor the latest `prometheus/prometheus`@`main` into [`opentelemetry/opentelemetry-collector-contrib`](https://github.com/open-telemetry/opentelemetry-collector-contrib)
1. Update the VERSION in `update-copy.sh`.
1. Run `./update-copy.sh`.
### Why copy?
This is because the packages we copy depend on the [`prompb`](https://github.com/prometheus/prometheus/blob/main/prompb) package. While the package is relatively stable, there are still changes. For example, https://github.com/prometheus/prometheus/pull/11935 changed the types.
This means if we depend on the upstream packages directly, we will never able to make the changes like above. Hence we're copying the code for now.
### I need to manually change these files
When we do want to make changes to the types in `prompb`, we might need to edit the files directly. That is OK, please let @gouthamve or @jesusvazquez know so they can take care of updating the upstream code (by vendoring in `prometheus/prometheus` upstream and resolving conflicts) and then will run the copy
script again to keep things updated.
[^1]: https://github.com/open-telemetry/opentelemetry-collector-contrib/tree/main/pkg/translator/prometheus and https://github.com/open-telemetry/opentelemetry-collector-contrib/tree/main/pkg/translator/prometheusremotewrite

41
storage/remote/otlptranslator/prometheus/normalize_label.go

@ -0,0 +1,41 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0
package normalize
import (
"strings"
"unicode"
)
// Normalizes the specified label to follow Prometheus label names standard
//
// See rules at https://prometheus.io/docs/concepts/data_model/#metric-names-and-labels
//
// Labels that start with non-letter rune will be prefixed with "key_"
//
// Exception is made for double-underscores which are allowed
func NormalizeLabel(label string) string {
// Trivial case
if len(label) == 0 {
return label
}
// Replace all non-alphanumeric runes with underscores
label = strings.Map(sanitizeRune, label)
// If label starts with a number, prepend with "key_"
if unicode.IsDigit(rune(label[0])) {
label = "key_" + label
}
return label
}
// Return '_' for anything non-alphanumeric
func sanitizeRune(r rune) rune {
if unicode.IsLetter(r) || unicode.IsDigit(r) {
return r
}
return '_'
}

19
storage/remote/otlptranslator/prometheus/normalize_label_test.go

@ -0,0 +1,19 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0
package normalize
import (
"testing"
"github.com/stretchr/testify/require"
)
func TestSanitizeDropSanitization(t *testing.T) {
require.Equal(t, "", NormalizeLabel(""))
require.Equal(t, "_test", NormalizeLabel("_test"))
require.Equal(t, "key_0test", NormalizeLabel("0test"))
require.Equal(t, "test", NormalizeLabel("test"))
require.Equal(t, "test__", NormalizeLabel("test_/"))
require.Equal(t, "__test", NormalizeLabel("__test"))
}

251
storage/remote/otlptranslator/prometheus/normalize_name.go

@ -0,0 +1,251 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0
package normalize
import (
"strings"
"unicode"
"go.opentelemetry.io/collector/pdata/pmetric"
)
// The map to translate OTLP units to Prometheus units
// OTLP metrics use the c/s notation as specified at https://ucum.org/ucum.html
// (See also https://github.com/open-telemetry/opentelemetry-specification/blob/main/specification/metrics/semantic_conventions/README.md#instrument-units)
// Prometheus best practices for units: https://prometheus.io/docs/practices/naming/#base-units
// OpenMetrics specification for units: https://github.com/OpenObservability/OpenMetrics/blob/main/specification/OpenMetrics.md#units-and-base-units
var unitMap = map[string]string{
// Time
"d": "days",
"h": "hours",
"min": "minutes",
"s": "seconds",
"ms": "milliseconds",
"us": "microseconds",
"ns": "nanoseconds",
// Bytes
"By": "bytes",
"KiBy": "kibibytes",
"MiBy": "mebibytes",
"GiBy": "gibibytes",
"TiBy": "tibibytes",
"KBy": "kilobytes",
"MBy": "megabytes",
"GBy": "gigabytes",
"TBy": "terabytes",
"B": "bytes",
"KB": "kilobytes",
"MB": "megabytes",
"GB": "gigabytes",
"TB": "terabytes",
// SI
"m": "meters",
"V": "volts",
"A": "amperes",
"J": "joules",
"W": "watts",
"g": "grams",
// Misc
"Cel": "celsius",
"Hz": "hertz",
"1": "",
"%": "percent",
"$": "dollars",
}
// The map that translates the "per" unit
// Example: s => per second (singular)
var perUnitMap = map[string]string{
"s": "second",
"m": "minute",
"h": "hour",
"d": "day",
"w": "week",
"mo": "month",
"y": "year",
}
// Build a Prometheus-compliant metric name for the specified metric
//
// Metric name is prefixed with specified namespace and underscore (if any).
// Namespace is not cleaned up. Make sure specified namespace follows Prometheus
// naming convention.
//
// See rules at https://prometheus.io/docs/concepts/data_model/#metric-names-and-labels
// and https://prometheus.io/docs/practices/naming/#metric-and-label-naming
func BuildPromCompliantName(metric pmetric.Metric, namespace string) string {
// Split metric name in "tokens" (remove all non-alphanumeric)
nameTokens := strings.FieldsFunc(
metric.Name(),
func(r rune) bool { return !unicode.IsLetter(r) && !unicode.IsDigit(r) },
)
// Split unit at the '/' if any
unitTokens := strings.SplitN(metric.Unit(), "/", 2)
// Main unit
// Append if not blank, doesn't contain '{}', and is not present in metric name already
if len(unitTokens) > 0 {
mainUnitOtel := strings.TrimSpace(unitTokens[0])
if mainUnitOtel != "" && !strings.ContainsAny(mainUnitOtel, "{}") {
mainUnitProm := CleanUpString(unitMapGetOrDefault(mainUnitOtel))
if mainUnitProm != "" && !contains(nameTokens, mainUnitProm) {
nameTokens = append(nameTokens, mainUnitProm)
}
}
// Per unit
// Append if not blank, doesn't contain '{}', and is not present in metric name already
if len(unitTokens) > 1 && unitTokens[1] != "" {
perUnitOtel := strings.TrimSpace(unitTokens[1])
if perUnitOtel != "" && !strings.ContainsAny(perUnitOtel, "{}") {
perUnitProm := CleanUpString(perUnitMapGetOrDefault(perUnitOtel))
if perUnitProm != "" && !contains(nameTokens, perUnitProm) {
nameTokens = append(append(nameTokens, "per"), perUnitProm)
}
}
}
}
// Append _total for Counters
if metric.Type() == pmetric.MetricTypeSum && metric.Sum().IsMonotonic() {
nameTokens = append(removeItem(nameTokens, "total"), "total")
}
// Append _ratio for metrics with unit "1"
// Some Otel receivers improperly use unit "1" for counters of objects
// See https://github.com/open-telemetry/opentelemetry-collector-contrib/issues?q=is%3Aissue+some+metric+units+don%27t+follow+otel+semantic+conventions
// Until these issues have been fixed, we're appending `_ratio` for gauges ONLY
// Theoretically, counters could be ratios as well, but it's absurd (for mathematical reasons)
if metric.Unit() == "1" && metric.Type() == pmetric.MetricTypeGauge {
nameTokens = append(removeItem(nameTokens, "ratio"), "ratio")
}
// Namespace?
if namespace != "" {
nameTokens = append([]string{namespace}, nameTokens...)
}
// Build the string from the tokens, separated with underscores
normalizedName := strings.Join(nameTokens, "_")
// Metric name cannot start with a digit, so prefix it with "_" in this case
if normalizedName != "" && unicode.IsDigit(rune(normalizedName[0])) {
normalizedName = "_" + normalizedName
}
return normalizedName
}
// TrimPromSuffixes trims type and unit prometheus suffixes from a metric name.
// Following the [OpenTelemetry specs] for converting Prometheus Metric points to OTLP.
//
// [OpenTelemetry specs]: https://github.com/open-telemetry/opentelemetry-specification/blob/main/specification/metrics/data-model.md#metric-metadata
func TrimPromSuffixes(promName string, metricType pmetric.MetricType, unit string) string {
nameTokens := strings.Split(promName, "_")
if len(nameTokens) == 1 {
return promName
}
nameTokens = removeTypeSuffixes(nameTokens, metricType)
nameTokens = removeUnitSuffixes(nameTokens, unit)
return strings.Join(nameTokens, "_")
}
func removeTypeSuffixes(tokens []string, metricType pmetric.MetricType) []string {
switch metricType {
case pmetric.MetricTypeSum:
// Only counters are expected to have a type suffix at this point.
// for other types, suffixes are removed during scrape.
return removeSuffix(tokens, "total")
default:
return tokens
}
}
func removeUnitSuffixes(nameTokens []string, unit string) []string {
l := len(nameTokens)
unitTokens := strings.Split(unit, "_")
lu := len(unitTokens)
if lu == 0 || l <= lu {
return nameTokens
}
suffixed := true
for i := range unitTokens {
if nameTokens[l-i-1] != unitTokens[lu-i-1] {
suffixed = false
break
}
}
if suffixed {
return nameTokens[:l-lu]
}
return nameTokens
}
func removeSuffix(tokens []string, suffix string) []string {
l := len(tokens)
if tokens[l-1] == suffix {
return tokens[:l-1]
}
return tokens
}
// Clean up specified string so it's Prometheus compliant
func CleanUpString(s string) string {
return strings.Join(strings.FieldsFunc(s, func(r rune) bool { return !unicode.IsLetter(r) && !unicode.IsDigit(r) }), "_")
}
func RemovePromForbiddenRunes(s string) string {
return strings.Join(strings.FieldsFunc(s, func(r rune) bool { return !unicode.IsLetter(r) && !unicode.IsDigit(r) && r != '_' && r != ':' }), "_")
}
// Retrieve the Prometheus "basic" unit corresponding to the specified "basic" unit
// Returns the specified unit if not found in unitMap
func unitMapGetOrDefault(unit string) string {
if promUnit, ok := unitMap[unit]; ok {
return promUnit
}
return unit
}
// Retrieve the Prometheus "per" unit corresponding to the specified "per" unit
// Returns the specified unit if not found in perUnitMap
func perUnitMapGetOrDefault(perUnit string) string {
if promPerUnit, ok := perUnitMap[perUnit]; ok {
return promPerUnit
}
return perUnit
}
// Returns whether the slice contains the specified value
func contains(slice []string, value string) bool {
for _, sliceEntry := range slice {
if sliceEntry == value {
return true
}
}
return false
}
// Remove the specified value from the slice
func removeItem(slice []string, value string) []string {
newSlice := make([]string, 0, len(slice))
for _, sliceEntry := range slice {
if sliceEntry != value {
newSlice = append(newSlice, sliceEntry)
}
}
return newSlice
}

180
storage/remote/otlptranslator/prometheus/normalize_name_test.go

@ -0,0 +1,180 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0
package normalize
import (
"testing"
"github.com/stretchr/testify/require"
"go.opentelemetry.io/collector/pdata/pmetric"
)
func TestByte(t *testing.T) {
require.Equal(t, "system_filesystem_usage_bytes", BuildPromCompliantName(createGauge("system.filesystem.usage", "By"), ""))
}
func TestByteCounter(t *testing.T) {
require.Equal(t, "system_io_bytes_total", BuildPromCompliantName(createCounter("system.io", "By"), ""))
require.Equal(t, "network_transmitted_bytes_total", BuildPromCompliantName(createCounter("network_transmitted_bytes_total", "By"), ""))
}
func TestWhiteSpaces(t *testing.T) {
require.Equal(t, "system_filesystem_usage_bytes", BuildPromCompliantName(createGauge("\t system.filesystem.usage ", " By\t"), ""))
}
func TestNonStandardUnit(t *testing.T) {
require.Equal(t, "system_network_dropped", BuildPromCompliantName(createGauge("system.network.dropped", "{packets}"), ""))
}
func TestNonStandardUnitCounter(t *testing.T) {
require.Equal(t, "system_network_dropped_total", BuildPromCompliantName(createCounter("system.network.dropped", "{packets}"), ""))
}
func TestBrokenUnit(t *testing.T) {
require.Equal(t, "system_network_dropped_packets", BuildPromCompliantName(createGauge("system.network.dropped", "packets"), ""))
require.Equal(t, "system_network_packets_dropped", BuildPromCompliantName(createGauge("system.network.packets.dropped", "packets"), ""))
require.Equal(t, "system_network_packets", BuildPromCompliantName(createGauge("system.network.packets", "packets"), ""))
}
func TestBrokenUnitCounter(t *testing.T) {
require.Equal(t, "system_network_dropped_packets_total", BuildPromCompliantName(createCounter("system.network.dropped", "packets"), ""))
require.Equal(t, "system_network_packets_dropped_total", BuildPromCompliantName(createCounter("system.network.packets.dropped", "packets"), ""))
require.Equal(t, "system_network_packets_total", BuildPromCompliantName(createCounter("system.network.packets", "packets"), ""))
}
func TestRatio(t *testing.T) {
require.Equal(t, "hw_gpu_memory_utilization_ratio", BuildPromCompliantName(createGauge("hw.gpu.memory.utilization", "1"), ""))
require.Equal(t, "hw_fan_speed_ratio", BuildPromCompliantName(createGauge("hw.fan.speed_ratio", "1"), ""))
require.Equal(t, "objects_total", BuildPromCompliantName(createCounter("objects", "1"), ""))
}
func TestHertz(t *testing.T) {
require.Equal(t, "hw_cpu_speed_limit_hertz", BuildPromCompliantName(createGauge("hw.cpu.speed_limit", "Hz"), ""))
}
func TestPer(t *testing.T) {
require.Equal(t, "broken_metric_speed_km_per_hour", BuildPromCompliantName(createGauge("broken.metric.speed", "km/h"), ""))
require.Equal(t, "astro_light_speed_limit_meters_per_second", BuildPromCompliantName(createGauge("astro.light.speed_limit", "m/s"), ""))
}
func TestPercent(t *testing.T) {
require.Equal(t, "broken_metric_success_ratio_percent", BuildPromCompliantName(createGauge("broken.metric.success_ratio", "%"), ""))
require.Equal(t, "broken_metric_success_percent", BuildPromCompliantName(createGauge("broken.metric.success_percent", "%"), ""))
}
func TestDollar(t *testing.T) {
require.Equal(t, "crypto_bitcoin_value_dollars", BuildPromCompliantName(createGauge("crypto.bitcoin.value", "$"), ""))
require.Equal(t, "crypto_bitcoin_value_dollars", BuildPromCompliantName(createGauge("crypto.bitcoin.value.dollars", "$"), ""))
}
func TestEmpty(t *testing.T) {
require.Equal(t, "test_metric_no_unit", BuildPromCompliantName(createGauge("test.metric.no_unit", ""), ""))
require.Equal(t, "test_metric_spaces", BuildPromCompliantName(createGauge("test.metric.spaces", " \t "), ""))
}
func TestUnsupportedRunes(t *testing.T) {
require.Equal(t, "unsupported_metric_temperature_F", BuildPromCompliantName(createGauge("unsupported.metric.temperature", "°F"), ""))
require.Equal(t, "unsupported_metric_weird", BuildPromCompliantName(createGauge("unsupported.metric.weird", "+=.:,!* & #"), ""))
require.Equal(t, "unsupported_metric_redundant_test_per_C", BuildPromCompliantName(createGauge("unsupported.metric.redundant", "__test $/°C"), ""))
}
func TestOtelReceivers(t *testing.T) {
require.Equal(t, "active_directory_ds_replication_network_io_bytes_total", BuildPromCompliantName(createCounter("active_directory.ds.replication.network.io", "By"), ""))
require.Equal(t, "active_directory_ds_replication_sync_object_pending_total", BuildPromCompliantName(createCounter("active_directory.ds.replication.sync.object.pending", "{objects}"), ""))
require.Equal(t, "active_directory_ds_replication_object_rate_per_second", BuildPromCompliantName(createGauge("active_directory.ds.replication.object.rate", "{objects}/s"), ""))
require.Equal(t, "active_directory_ds_name_cache_hit_rate_percent", BuildPromCompliantName(createGauge("active_directory.ds.name_cache.hit_rate", "%"), ""))
require.Equal(t, "active_directory_ds_ldap_bind_last_successful_time_milliseconds", BuildPromCompliantName(createGauge("active_directory.ds.ldap.bind.last_successful.time", "ms"), ""))
require.Equal(t, "apache_current_connections", BuildPromCompliantName(createGauge("apache.current_connections", "connections"), ""))
require.Equal(t, "apache_workers_connections", BuildPromCompliantName(createGauge("apache.workers", "connections"), ""))
require.Equal(t, "apache_requests_total", BuildPromCompliantName(createCounter("apache.requests", "1"), ""))
require.Equal(t, "bigip_virtual_server_request_count_total", BuildPromCompliantName(createCounter("bigip.virtual_server.request.count", "{requests}"), ""))
require.Equal(t, "system_cpu_utilization_ratio", BuildPromCompliantName(createGauge("system.cpu.utilization", "1"), ""))
require.Equal(t, "system_disk_operation_time_seconds_total", BuildPromCompliantName(createCounter("system.disk.operation_time", "s"), ""))
require.Equal(t, "system_cpu_load_average_15m_ratio", BuildPromCompliantName(createGauge("system.cpu.load_average.15m", "1"), ""))
require.Equal(t, "memcached_operation_hit_ratio_percent", BuildPromCompliantName(createGauge("memcached.operation_hit_ratio", "%"), ""))
require.Equal(t, "mongodbatlas_process_asserts_per_second", BuildPromCompliantName(createGauge("mongodbatlas.process.asserts", "{assertions}/s"), ""))
require.Equal(t, "mongodbatlas_process_journaling_data_files_mebibytes", BuildPromCompliantName(createGauge("mongodbatlas.process.journaling.data_files", "MiBy"), ""))
require.Equal(t, "mongodbatlas_process_network_io_bytes_per_second", BuildPromCompliantName(createGauge("mongodbatlas.process.network.io", "By/s"), ""))
require.Equal(t, "mongodbatlas_process_oplog_rate_gibibytes_per_hour", BuildPromCompliantName(createGauge("mongodbatlas.process.oplog.rate", "GiBy/h"), ""))
require.Equal(t, "mongodbatlas_process_db_query_targeting_scanned_per_returned", BuildPromCompliantName(createGauge("mongodbatlas.process.db.query_targeting.scanned_per_returned", "{scanned}/{returned}"), ""))
require.Equal(t, "nginx_requests", BuildPromCompliantName(createGauge("nginx.requests", "requests"), ""))
require.Equal(t, "nginx_connections_accepted", BuildPromCompliantName(createGauge("nginx.connections_accepted", "connections"), ""))
require.Equal(t, "nsxt_node_memory_usage_kilobytes", BuildPromCompliantName(createGauge("nsxt.node.memory.usage", "KBy"), ""))
require.Equal(t, "redis_latest_fork_microseconds", BuildPromCompliantName(createGauge("redis.latest_fork", "us"), ""))
}
func TestTrimPromSuffixes(t *testing.T) {
require.Equal(t, "active_directory_ds_replication_network_io", TrimPromSuffixes("active_directory_ds_replication_network_io_bytes_total", pmetric.MetricTypeSum, "bytes"))
require.Equal(t, "active_directory_ds_name_cache_hit_rate", TrimPromSuffixes("active_directory_ds_name_cache_hit_rate_percent", pmetric.MetricTypeGauge, "percent"))
require.Equal(t, "active_directory_ds_ldap_bind_last_successful_time", TrimPromSuffixes("active_directory_ds_ldap_bind_last_successful_time_milliseconds", pmetric.MetricTypeGauge, "milliseconds"))
require.Equal(t, "apache_requests", TrimPromSuffixes("apache_requests_total", pmetric.MetricTypeSum, "1"))
require.Equal(t, "system_cpu_utilization", TrimPromSuffixes("system_cpu_utilization_ratio", pmetric.MetricTypeGauge, "ratio"))
require.Equal(t, "mongodbatlas_process_journaling_data_files", TrimPromSuffixes("mongodbatlas_process_journaling_data_files_mebibytes", pmetric.MetricTypeGauge, "mebibytes"))
require.Equal(t, "mongodbatlas_process_network_io", TrimPromSuffixes("mongodbatlas_process_network_io_bytes_per_second", pmetric.MetricTypeGauge, "bytes_per_second"))
require.Equal(t, "mongodbatlas_process_oplog_rate", TrimPromSuffixes("mongodbatlas_process_oplog_rate_gibibytes_per_hour", pmetric.MetricTypeGauge, "gibibytes_per_hour"))
require.Equal(t, "nsxt_node_memory_usage", TrimPromSuffixes("nsxt_node_memory_usage_kilobytes", pmetric.MetricTypeGauge, "kilobytes"))
require.Equal(t, "redis_latest_fork", TrimPromSuffixes("redis_latest_fork_microseconds", pmetric.MetricTypeGauge, "microseconds"))
require.Equal(t, "up", TrimPromSuffixes("up", pmetric.MetricTypeGauge, ""))
// These are not necessarily valid OM units, only tested for the sake of completeness.
require.Equal(t, "active_directory_ds_replication_sync_object_pending", TrimPromSuffixes("active_directory_ds_replication_sync_object_pending_total", pmetric.MetricTypeSum, "{objects}"))
require.Equal(t, "apache_current", TrimPromSuffixes("apache_current_connections", pmetric.MetricTypeGauge, "connections"))
require.Equal(t, "bigip_virtual_server_request_count", TrimPromSuffixes("bigip_virtual_server_request_count_total", pmetric.MetricTypeSum, "{requests}"))
require.Equal(t, "mongodbatlas_process_db_query_targeting_scanned_per_returned", TrimPromSuffixes("mongodbatlas_process_db_query_targeting_scanned_per_returned", pmetric.MetricTypeGauge, "{scanned}/{returned}"))
require.Equal(t, "nginx_connections_accepted", TrimPromSuffixes("nginx_connections_accepted", pmetric.MetricTypeGauge, "connections"))
require.Equal(t, "apache_workers", TrimPromSuffixes("apache_workers_connections", pmetric.MetricTypeGauge, "connections"))
require.Equal(t, "nginx", TrimPromSuffixes("nginx_requests", pmetric.MetricTypeGauge, "requests"))
// Units shouldn't be trimmed if the unit is not a direct match with the suffix, i.e, a suffix "_seconds" shouldn't be removed if unit is "sec" or "s"
require.Equal(t, "system_cpu_load_average_15m_ratio", TrimPromSuffixes("system_cpu_load_average_15m_ratio", pmetric.MetricTypeGauge, "1"))
require.Equal(t, "mongodbatlas_process_asserts_per_second", TrimPromSuffixes("mongodbatlas_process_asserts_per_second", pmetric.MetricTypeGauge, "{assertions}/s"))
require.Equal(t, "memcached_operation_hit_ratio_percent", TrimPromSuffixes("memcached_operation_hit_ratio_percent", pmetric.MetricTypeGauge, "%"))
require.Equal(t, "active_directory_ds_replication_object_rate_per_second", TrimPromSuffixes("active_directory_ds_replication_object_rate_per_second", pmetric.MetricTypeGauge, "{objects}/s"))
require.Equal(t, "system_disk_operation_time_seconds", TrimPromSuffixes("system_disk_operation_time_seconds_total", pmetric.MetricTypeSum, "s"))
}
func TestNamespace(t *testing.T) {
require.Equal(t, "space_test", BuildPromCompliantName(createGauge("test", ""), "space"))
require.Equal(t, "space_test", BuildPromCompliantName(createGauge("#test", ""), "space"))
}
func TestCleanUpString(t *testing.T) {
require.Equal(t, "", CleanUpString(""))
require.Equal(t, "a_b", CleanUpString("a b"))
require.Equal(t, "hello_world", CleanUpString("hello, world!"))
require.Equal(t, "hello_you_2", CleanUpString("hello you 2"))
require.Equal(t, "1000", CleanUpString("$1000"))
require.Equal(t, "", CleanUpString("*+$^=)"))
}
func TestUnitMapGetOrDefault(t *testing.T) {
require.Equal(t, "", unitMapGetOrDefault(""))
require.Equal(t, "seconds", unitMapGetOrDefault("s"))
require.Equal(t, "invalid", unitMapGetOrDefault("invalid"))
}
func TestPerUnitMapGetOrDefault(t *testing.T) {
require.Equal(t, "", perUnitMapGetOrDefault(""))
require.Equal(t, "second", perUnitMapGetOrDefault("s"))
require.Equal(t, "invalid", perUnitMapGetOrDefault("invalid"))
}
func TestRemoveItem(t *testing.T) {
require.Equal(t, []string{}, removeItem([]string{}, "test"))
require.Equal(t, []string{}, removeItem([]string{}, ""))
require.Equal(t, []string{"a", "b", "c"}, removeItem([]string{"a", "b", "c"}, "d"))
require.Equal(t, []string{"a", "b", "c"}, removeItem([]string{"a", "b", "c"}, ""))
require.Equal(t, []string{"a", "b"}, removeItem([]string{"a", "b", "c"}, "c"))
require.Equal(t, []string{"a", "c"}, removeItem([]string{"a", "b", "c"}, "b"))
require.Equal(t, []string{"b", "c"}, removeItem([]string{"a", "b", "c"}, "a"))
}
func TestBuildPromCompliantName(t *testing.T) {
require.Equal(t, "system_io_bytes_total", BuildPromCompliantName(createCounter("system.io", "By"), ""))
require.Equal(t, "system_network_io_bytes_total", BuildPromCompliantName(createCounter("network.io", "By"), "system"))
require.Equal(t, "_3_14_digits", BuildPromCompliantName(createGauge("3.14 digits", ""), ""))
require.Equal(t, "envoy_rule_engine_zlib_buf_error", BuildPromCompliantName(createGauge("envoy__rule_engine_zlib_buf_error", ""), ""))
require.Equal(t, "foo_bar", BuildPromCompliantName(createGauge(":foo::bar", ""), ""))
require.Equal(t, "foo_bar_total", BuildPromCompliantName(createCounter(":foo::bar", ""), ""))
}

34
storage/remote/otlptranslator/prometheus/testutils_test.go

@ -0,0 +1,34 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0
package normalize
import (
"go.opentelemetry.io/collector/pdata/pmetric"
)
var ilm pmetric.ScopeMetrics
func init() {
metrics := pmetric.NewMetrics()
resourceMetrics := metrics.ResourceMetrics().AppendEmpty()
ilm = resourceMetrics.ScopeMetrics().AppendEmpty()
}
// Returns a new Metric of type "Gauge" with specified name and unit
func createGauge(name, unit string) pmetric.Metric {
gauge := ilm.Metrics().AppendEmpty()
gauge.SetName(name)
gauge.SetUnit(unit)
gauge.SetEmptyGauge()
return gauge
}
// Returns a new Metric of type Monotonic Sum with specified name and unit
func createCounter(name, unit string) pmetric.Metric {
counter := ilm.Metrics().AppendEmpty()
counter.SetEmptySum().SetIsMonotonic(true)
counter.SetName(name)
counter.SetUnit(unit)
return counter
}

559
storage/remote/otlptranslator/prometheusremotewrite/helper.go

@ -0,0 +1,559 @@
// DO NOT EDIT. COPIED AS-IS. SEE README.md
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0
package prometheusremotewrite // import "github.com/prometheus/prometheus/storage/remote/otlptranslator/prometheusremotewrite"
import (
"encoding/hex"
"fmt"
"log"
"math"
"sort"
"strconv"
"strings"
"time"
"unicode/utf8"
"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/model/timestamp"
"github.com/prometheus/prometheus/model/value"
"github.com/prometheus/prometheus/prompb"
"go.opentelemetry.io/collector/pdata/pcommon"
"go.opentelemetry.io/collector/pdata/pmetric"
conventions "go.opentelemetry.io/collector/semconv/v1.6.1"
prometheustranslator "github.com/prometheus/prometheus/storage/remote/otlptranslator/prometheus"
)
const (
nameStr = "__name__"
sumStr = "_sum"
countStr = "_count"
bucketStr = "_bucket"
leStr = "le"
quantileStr = "quantile"
pInfStr = "+Inf"
createdSuffix = "_created"
// maxExemplarRunes is the maximum number of UTF-8 exemplar characters
// according to the prometheus specification
// https://github.com/OpenObservability/OpenMetrics/blob/main/specification/OpenMetrics.md#exemplars
maxExemplarRunes = 128
// Trace and Span id keys are defined as part of the spec:
// https://github.com/open-telemetry/opentelemetry-specification/blob/main/specification%2Fmetrics%2Fdatamodel.md#exemplars-2
traceIDKey = "trace_id"
spanIDKey = "span_id"
infoType = "info"
targetMetricName = "target_info"
)
type bucketBoundsData struct {
sig string
bound float64
}
// byBucketBoundsData enables the usage of sort.Sort() with a slice of bucket bounds
type byBucketBoundsData []bucketBoundsData
func (m byBucketBoundsData) Len() int { return len(m) }
func (m byBucketBoundsData) Less(i, j int) bool { return m[i].bound < m[j].bound }
func (m byBucketBoundsData) Swap(i, j int) { m[i], m[j] = m[j], m[i] }
// ByLabelName enables the usage of sort.Sort() with a slice of labels
type ByLabelName []prompb.Label
func (a ByLabelName) Len() int { return len(a) }
func (a ByLabelName) Less(i, j int) bool { return a[i].Name < a[j].Name }
func (a ByLabelName) Swap(i, j int) { a[i], a[j] = a[j], a[i] }
// addSample finds a TimeSeries in tsMap that corresponds to the label set labels, and add sample to the TimeSeries; it
// creates a new TimeSeries in the map if not found and returns the time series signature.
// tsMap will be unmodified if either labels or sample is nil, but can still be modified if the exemplar is nil.
func addSample(tsMap map[string]*prompb.TimeSeries, sample *prompb.Sample, labels []prompb.Label,
datatype string) string {
if sample == nil || labels == nil || tsMap == nil {
return ""
}
sig := timeSeriesSignature(datatype, &labels)
ts, ok := tsMap[sig]
if ok {
ts.Samples = append(ts.Samples, *sample)
} else {
newTs := &prompb.TimeSeries{
Labels: labels,
Samples: []prompb.Sample{*sample},
}
tsMap[sig] = newTs
}
return sig
}
// addExemplars finds a bucket bound that corresponds to the exemplars value and add the exemplar to the specific sig;
// we only add exemplars if samples are presents
// tsMap is unmodified if either of its parameters is nil and samples are nil.
func addExemplars(tsMap map[string]*prompb.TimeSeries, exemplars []prompb.Exemplar, bucketBoundsData []bucketBoundsData) {
if tsMap == nil || bucketBoundsData == nil || exemplars == nil {
return
}
sort.Sort(byBucketBoundsData(bucketBoundsData))
for _, exemplar := range exemplars {
addExemplar(tsMap, bucketBoundsData, exemplar)
}
}
func addExemplar(tsMap map[string]*prompb.TimeSeries, bucketBounds []bucketBoundsData, exemplar prompb.Exemplar) {
for _, bucketBound := range bucketBounds {
sig := bucketBound.sig
bound := bucketBound.bound
_, ok := tsMap[sig]
if ok {
if tsMap[sig].Samples != nil {
if exemplar.Value <= bound {
tsMap[sig].Exemplars = append(tsMap[sig].Exemplars, exemplar)
return
}
}
}
}
}
// timeSeries return a string signature in the form of:
//
// TYPE-label1-value1- ... -labelN-valueN
//
// the label slice should not contain duplicate label names; this method sorts the slice by label name before creating
// the signature.
func timeSeriesSignature(datatype string, labels *[]prompb.Label) string {
b := strings.Builder{}
b.WriteString(datatype)
sort.Sort(ByLabelName(*labels))
for _, lb := range *labels {
b.WriteString("-")
b.WriteString(lb.GetName())
b.WriteString("-")
b.WriteString(lb.GetValue())
}
return b.String()
}
// createAttributes creates a slice of Cortex Label with OTLP attributes and pairs of string values.
// Unpaired string value is ignored. String pairs overwrites OTLP labels if collision happens, and the overwrite is
// logged. Resultant label names are sanitized.
func createAttributes(resource pcommon.Resource, attributes pcommon.Map, externalLabels map[string]string, extras ...string) []prompb.Label {
// map ensures no duplicate label name
l := map[string]prompb.Label{}
// Ensure attributes are sorted by key for consistent merging of keys which
// collide when sanitized.
labels := make([]prompb.Label, 0, attributes.Len())
attributes.Range(func(key string, value pcommon.Value) bool {
labels = append(labels, prompb.Label{Name: key, Value: value.AsString()})
return true
})
sort.Stable(ByLabelName(labels))
for _, label := range labels {
var finalKey = prometheustranslator.NormalizeLabel(label.Name)
if existingLabel, alreadyExists := l[finalKey]; alreadyExists {
existingLabel.Value = existingLabel.Value + ";" + label.Value
l[finalKey] = existingLabel
} else {
l[finalKey] = prompb.Label{
Name: finalKey,
Value: label.Value,
}
}
}
// Map service.name + service.namespace to job
if serviceName, ok := resource.Attributes().Get(conventions.AttributeServiceName); ok {
val := serviceName.AsString()
if serviceNamespace, ok := resource.Attributes().Get(conventions.AttributeServiceNamespace); ok {
val = fmt.Sprintf("%s/%s", serviceNamespace.AsString(), val)
}
l[model.JobLabel] = prompb.Label{
Name: model.JobLabel,
Value: val,
}
}
// Map service.instance.id to instance
if instance, ok := resource.Attributes().Get(conventions.AttributeServiceInstanceID); ok {
l[model.InstanceLabel] = prompb.Label{
Name: model.InstanceLabel,
Value: instance.AsString(),
}
}
for key, value := range externalLabels {
// External labels have already been sanitized
if _, alreadyExists := l[key]; alreadyExists {
// Skip external labels if they are overridden by metric attributes
continue
}
l[key] = prompb.Label{
Name: key,
Value: value,
}
}
for i := 0; i < len(extras); i += 2 {
if i+1 >= len(extras) {
break
}
_, found := l[extras[i]]
if found {
log.Println("label " + extras[i] + " is overwritten. Check if Prometheus reserved labels are used.")
}
// internal labels should be maintained
name := extras[i]
if !(len(name) > 4 && name[:2] == "__" && name[len(name)-2:] == "__") {
name = prometheustranslator.NormalizeLabel(name)
}
l[name] = prompb.Label{
Name: name,
Value: extras[i+1],
}
}
s := make([]prompb.Label, 0, len(l))
for _, lb := range l {
s = append(s, lb)
}
return s
}
// isValidAggregationTemporality checks whether an OTel metric has a valid
// aggregation temporality for conversion to a Prometheus metric.
func isValidAggregationTemporality(metric pmetric.Metric) bool {
switch metric.Type() {
case pmetric.MetricTypeGauge, pmetric.MetricTypeSummary:
return true
case pmetric.MetricTypeSum:
return metric.Sum().AggregationTemporality() == pmetric.AggregationTemporalityCumulative
case pmetric.MetricTypeHistogram:
return metric.Histogram().AggregationTemporality() == pmetric.AggregationTemporalityCumulative
case pmetric.MetricTypeExponentialHistogram:
return metric.ExponentialHistogram().AggregationTemporality() == pmetric.AggregationTemporalityCumulative
}
return false
}
// addSingleHistogramDataPoint converts pt to 2 + min(len(ExplicitBounds), len(BucketCount)) + 1 samples. It
// ignore extra buckets if len(ExplicitBounds) > len(BucketCounts)
func addSingleHistogramDataPoint(pt pmetric.HistogramDataPoint, resource pcommon.Resource, metric pmetric.Metric, settings Settings, tsMap map[string]*prompb.TimeSeries) {
timestamp := convertTimeStamp(pt.Timestamp())
// sum, count, and buckets of the histogram should append suffix to baseName
baseName := prometheustranslator.BuildPromCompliantName(metric, settings.Namespace)
// If the sum is unset, it indicates the _sum metric point should be
// omitted
if pt.HasSum() {
// treat sum as a sample in an individual TimeSeries
sum := &prompb.Sample{
Value: pt.Sum(),
Timestamp: timestamp,
}
if pt.Flags().NoRecordedValue() {
sum.Value = math.Float64frombits(value.StaleNaN)
}
sumlabels := createAttributes(resource, pt.Attributes(), settings.ExternalLabels, nameStr, baseName+sumStr)
addSample(tsMap, sum, sumlabels, metric.Type().String())
}
// treat count as a sample in an individual TimeSeries
count := &prompb.Sample{
Value: float64(pt.Count()),
Timestamp: timestamp,
}
if pt.Flags().NoRecordedValue() {
count.Value = math.Float64frombits(value.StaleNaN)
}
countlabels := createAttributes(resource, pt.Attributes(), settings.ExternalLabels, nameStr, baseName+countStr)
addSample(tsMap, count, countlabels, metric.Type().String())
// cumulative count for conversion to cumulative histogram
var cumulativeCount uint64
promExemplars := getPromExemplars[pmetric.HistogramDataPoint](pt)
var bucketBounds []bucketBoundsData
// process each bound, based on histograms proto definition, # of buckets = # of explicit bounds + 1
for i := 0; i < pt.ExplicitBounds().Len() && i < pt.BucketCounts().Len(); i++ {
bound := pt.ExplicitBounds().At(i)
cumulativeCount += pt.BucketCounts().At(i)
bucket := &prompb.Sample{
Value: float64(cumulativeCount),
Timestamp: timestamp,
}
if pt.Flags().NoRecordedValue() {
bucket.Value = math.Float64frombits(value.StaleNaN)
}
boundStr := strconv.FormatFloat(bound, 'f', -1, 64)
labels := createAttributes(resource, pt.Attributes(), settings.ExternalLabels, nameStr, baseName+bucketStr, leStr, boundStr)
sig := addSample(tsMap, bucket, labels, metric.Type().String())
bucketBounds = append(bucketBounds, bucketBoundsData{sig: sig, bound: bound})
}
// add le=+Inf bucket
infBucket := &prompb.Sample{
Timestamp: timestamp,
}
if pt.Flags().NoRecordedValue() {
infBucket.Value = math.Float64frombits(value.StaleNaN)
} else {
infBucket.Value = float64(pt.Count())
}
infLabels := createAttributes(resource, pt.Attributes(), settings.ExternalLabels, nameStr, baseName+bucketStr, leStr, pInfStr)
sig := addSample(tsMap, infBucket, infLabels, metric.Type().String())
bucketBounds = append(bucketBounds, bucketBoundsData{sig: sig, bound: math.Inf(1)})
addExemplars(tsMap, promExemplars, bucketBounds)
// add _created time series if needed
startTimestamp := pt.StartTimestamp()
if settings.ExportCreatedMetric && startTimestamp != 0 {
createdLabels := createAttributes(
resource,
pt.Attributes(),
settings.ExternalLabels,
nameStr,
baseName+createdSuffix,
)
addCreatedTimeSeriesIfNeeded(tsMap, createdLabels, startTimestamp, metric.Type().String())
}
}
type exemplarType interface {
pmetric.ExponentialHistogramDataPoint | pmetric.HistogramDataPoint | pmetric.NumberDataPoint
Exemplars() pmetric.ExemplarSlice
}
func getPromExemplars[T exemplarType](pt T) []prompb.Exemplar {
var promExemplars []prompb.Exemplar
for i := 0; i < pt.Exemplars().Len(); i++ {
exemplar := pt.Exemplars().At(i)
exemplarRunes := 0
promExemplar := &prompb.Exemplar{
Value: exemplar.DoubleValue(),
Timestamp: timestamp.FromTime(exemplar.Timestamp().AsTime()),
}
if traceID := exemplar.TraceID(); !traceID.IsEmpty() {
val := hex.EncodeToString(traceID[:])
exemplarRunes += utf8.RuneCountInString(traceIDKey) + utf8.RuneCountInString(val)
promLabel := prompb.Label{
Name: traceIDKey,
Value: val,
}
promExemplar.Labels = append(promExemplar.Labels, promLabel)
}
if spanID := exemplar.SpanID(); !spanID.IsEmpty() {
val := hex.EncodeToString(spanID[:])
exemplarRunes += utf8.RuneCountInString(spanIDKey) + utf8.RuneCountInString(val)
promLabel := prompb.Label{
Name: spanIDKey,
Value: val,
}
promExemplar.Labels = append(promExemplar.Labels, promLabel)
}
var labelsFromAttributes []prompb.Label
exemplar.FilteredAttributes().Range(func(key string, value pcommon.Value) bool {
val := value.AsString()
exemplarRunes += utf8.RuneCountInString(key) + utf8.RuneCountInString(val)
promLabel := prompb.Label{
Name: key,
Value: val,
}
labelsFromAttributes = append(labelsFromAttributes, promLabel)
return true
})
if exemplarRunes <= maxExemplarRunes {
// only append filtered attributes if it does not cause exemplar
// labels to exceed the max number of runes
promExemplar.Labels = append(promExemplar.Labels, labelsFromAttributes...)
}
promExemplars = append(promExemplars, *promExemplar)
}
return promExemplars
}
// mostRecentTimestampInMetric returns the latest timestamp in a batch of metrics
func mostRecentTimestampInMetric(metric pmetric.Metric) pcommon.Timestamp {
var ts pcommon.Timestamp
// handle individual metric based on type
switch metric.Type() {
case pmetric.MetricTypeGauge:
dataPoints := metric.Gauge().DataPoints()
for x := 0; x < dataPoints.Len(); x++ {
ts = maxTimestamp(ts, dataPoints.At(x).Timestamp())
}
case pmetric.MetricTypeSum:
dataPoints := metric.Sum().DataPoints()
for x := 0; x < dataPoints.Len(); x++ {
ts = maxTimestamp(ts, dataPoints.At(x).Timestamp())
}
case pmetric.MetricTypeHistogram:
dataPoints := metric.Histogram().DataPoints()
for x := 0; x < dataPoints.Len(); x++ {
ts = maxTimestamp(ts, dataPoints.At(x).Timestamp())
}
case pmetric.MetricTypeExponentialHistogram:
dataPoints := metric.ExponentialHistogram().DataPoints()
for x := 0; x < dataPoints.Len(); x++ {
ts = maxTimestamp(ts, dataPoints.At(x).Timestamp())
}
case pmetric.MetricTypeSummary:
dataPoints := metric.Summary().DataPoints()
for x := 0; x < dataPoints.Len(); x++ {
ts = maxTimestamp(ts, dataPoints.At(x).Timestamp())
}
}
return ts
}
func maxTimestamp(a, b pcommon.Timestamp) pcommon.Timestamp {
if a > b {
return a
}
return b
}
// addSingleSummaryDataPoint converts pt to len(QuantileValues) + 2 samples.
func addSingleSummaryDataPoint(pt pmetric.SummaryDataPoint, resource pcommon.Resource, metric pmetric.Metric, settings Settings,
tsMap map[string]*prompb.TimeSeries) {
timestamp := convertTimeStamp(pt.Timestamp())
// sum and count of the summary should append suffix to baseName
baseName := prometheustranslator.BuildPromCompliantName(metric, settings.Namespace)
// treat sum as a sample in an individual TimeSeries
sum := &prompb.Sample{
Value: pt.Sum(),
Timestamp: timestamp,
}
if pt.Flags().NoRecordedValue() {
sum.Value = math.Float64frombits(value.StaleNaN)
}
sumlabels := createAttributes(resource, pt.Attributes(), settings.ExternalLabels, nameStr, baseName+sumStr)
addSample(tsMap, sum, sumlabels, metric.Type().String())
// treat count as a sample in an individual TimeSeries
count := &prompb.Sample{
Value: float64(pt.Count()),
Timestamp: timestamp,
}
if pt.Flags().NoRecordedValue() {
count.Value = math.Float64frombits(value.StaleNaN)
}
countlabels := createAttributes(resource, pt.Attributes(), settings.ExternalLabels, nameStr, baseName+countStr)
addSample(tsMap, count, countlabels, metric.Type().String())
// process each percentile/quantile
for i := 0; i < pt.QuantileValues().Len(); i++ {
qt := pt.QuantileValues().At(i)
quantile := &prompb.Sample{
Value: qt.Value(),
Timestamp: timestamp,
}
if pt.Flags().NoRecordedValue() {
quantile.Value = math.Float64frombits(value.StaleNaN)
}
percentileStr := strconv.FormatFloat(qt.Quantile(), 'f', -1, 64)
qtlabels := createAttributes(resource, pt.Attributes(), settings.ExternalLabels, nameStr, baseName, quantileStr, percentileStr)
addSample(tsMap, quantile, qtlabels, metric.Type().String())
}
// add _created time series if needed
startTimestamp := pt.StartTimestamp()
if settings.ExportCreatedMetric && startTimestamp != 0 {
createdLabels := createAttributes(
resource,
pt.Attributes(),
settings.ExternalLabels,
nameStr,
baseName+createdSuffix,
)
addCreatedTimeSeriesIfNeeded(tsMap, createdLabels, startTimestamp, metric.Type().String())
}
}
// addCreatedTimeSeriesIfNeeded adds {name}_created time series with a single
// sample. If the series exists, then new samples won't be added.
func addCreatedTimeSeriesIfNeeded(
series map[string]*prompb.TimeSeries,
labels []prompb.Label,
startTimestamp pcommon.Timestamp,
metricType string,
) {
sig := timeSeriesSignature(metricType, &labels)
if _, ok := series[sig]; !ok {
series[sig] = &prompb.TimeSeries{
Labels: labels,
Samples: []prompb.Sample{
{ // convert ns to ms
Value: float64(convertTimeStamp(startTimestamp)),
},
},
}
}
}
// addResourceTargetInfo converts the resource to the target info metric
func addResourceTargetInfo(resource pcommon.Resource, settings Settings, timestamp pcommon.Timestamp, tsMap map[string]*prompb.TimeSeries) {
if settings.DisableTargetInfo {
return
}
// Use resource attributes (other than those used for job+instance) as the
// metric labels for the target info metric
attributes := pcommon.NewMap()
resource.Attributes().CopyTo(attributes)
attributes.RemoveIf(func(k string, _ pcommon.Value) bool {
switch k {
case conventions.AttributeServiceName, conventions.AttributeServiceNamespace, conventions.AttributeServiceInstanceID:
// Remove resource attributes used for job + instance
return true
default:
return false
}
})
if attributes.Len() == 0 {
// If we only have job + instance, then target_info isn't useful, so don't add it.
return
}
// create parameters for addSample
name := targetMetricName
if len(settings.Namespace) > 0 {
name = settings.Namespace + "_" + name
}
labels := createAttributes(resource, attributes, settings.ExternalLabels, nameStr, name)
sample := &prompb.Sample{
Value: float64(1),
// convert ns to ms
Timestamp: convertTimeStamp(timestamp),
}
addSample(tsMap, sample, labels, infoType)
}
// convertTimeStamp converts OTLP timestamp in ns to timestamp in ms
func convertTimeStamp(timestamp pcommon.Timestamp) int64 {
return timestamp.AsTime().UnixNano() / (int64(time.Millisecond) / int64(time.Nanosecond))
}

158
storage/remote/otlptranslator/prometheusremotewrite/histograms.go

@ -0,0 +1,158 @@
// DO NOT EDIT. COPIED AS-IS. SEE README.md
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0
package prometheusremotewrite // import "github.com/prometheus/prometheus/storage/remote/otlptranslator/prometheusremotewrite"
import (
"fmt"
"math"
"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/model/value"
"github.com/prometheus/prometheus/prompb"
"go.opentelemetry.io/collector/pdata/pcommon"
"go.opentelemetry.io/collector/pdata/pmetric"
)
const defaultZeroThreshold = 1e-128
func addSingleExponentialHistogramDataPoint(
metric string,
pt pmetric.ExponentialHistogramDataPoint,
resource pcommon.Resource,
settings Settings,
series map[string]*prompb.TimeSeries,
) error {
labels := createAttributes(
resource,
pt.Attributes(),
settings.ExternalLabels,
model.MetricNameLabel, metric,
)
sig := timeSeriesSignature(
pmetric.MetricTypeExponentialHistogram.String(),
&labels,
)
ts, ok := series[sig]
if !ok {
ts = &prompb.TimeSeries{
Labels: labels,
}
series[sig] = ts
}
histogram, err := exponentialToNativeHistogram(pt)
if err != nil {
return err
}
ts.Histograms = append(ts.Histograms, histogram)
exemplars := getPromExemplars[pmetric.ExponentialHistogramDataPoint](pt)
ts.Exemplars = append(ts.Exemplars, exemplars...)
return nil
}
// exponentialToNativeHistogram translates OTel Exponential Histogram data point
// to Prometheus Native Histogram.
func exponentialToNativeHistogram(p pmetric.ExponentialHistogramDataPoint) (prompb.Histogram, error) {
scale := p.Scale()
if scale < -4 || scale > 8 {
return prompb.Histogram{},
fmt.Errorf("cannot convert exponential to native histogram."+
" Scale must be <= 8 and >= -4, was %d", scale)
// TODO: downscale to 8 if scale > 8
}
pSpans, pDeltas := convertBucketsLayout(p.Positive())
nSpans, nDeltas := convertBucketsLayout(p.Negative())
h := prompb.Histogram{
Schema: scale,
ZeroCount: &prompb.Histogram_ZeroCountInt{ZeroCountInt: p.ZeroCount()},
// TODO use zero_threshold, if set, see
// https://github.com/open-telemetry/opentelemetry-proto/pull/441
ZeroThreshold: defaultZeroThreshold,
PositiveSpans: pSpans,
PositiveDeltas: pDeltas,
NegativeSpans: nSpans,
NegativeDeltas: nDeltas,
Timestamp: convertTimeStamp(p.Timestamp()),
}
if p.Flags().NoRecordedValue() {
h.Sum = math.Float64frombits(value.StaleNaN)
h.Count = &prompb.Histogram_CountInt{CountInt: value.StaleNaN}
} else {
if p.HasSum() {
h.Sum = p.Sum()
}
h.Count = &prompb.Histogram_CountInt{CountInt: p.Count()}
}
return h, nil
}
// convertBucketsLayout translates OTel Exponential Histogram dense buckets
// representation to Prometheus Native Histogram sparse bucket representation.
//
// The translation logic is taken from the client_golang `histogram.go#makeBuckets`
// function, see `makeBuckets` https://github.com/prometheus/client_golang/blob/main/prometheus/histogram.go
// The bucket indexes conversion was adjusted, since OTel exp. histogram bucket
// index 0 corresponds to the range (1, base] while Prometheus bucket index 0
// to the range (base 1].
func convertBucketsLayout(buckets pmetric.ExponentialHistogramDataPointBuckets) ([]prompb.BucketSpan, []int64) {
bucketCounts := buckets.BucketCounts()
if bucketCounts.Len() == 0 {
return nil, nil
}
var (
spans []prompb.BucketSpan
deltas []int64
prevCount int64
nextBucketIdx int32
)
appendDelta := func(count int64) {
spans[len(spans)-1].Length++
deltas = append(deltas, count-prevCount)
prevCount = count
}
for i := 0; i < bucketCounts.Len(); i++ {
count := int64(bucketCounts.At(i))
if count == 0 {
continue
}
// The offset is adjusted by 1 as described above.
bucketIdx := int32(i) + buckets.Offset() + 1
delta := bucketIdx - nextBucketIdx
if i == 0 || delta > 2 {
// We have to create a new span, either because we are
// at the very beginning, or because we have found a gap
// of more than two buckets. The constant 2 is copied from the logic in
// https://github.com/prometheus/client_golang/blob/27f0506d6ebbb117b6b697d0552ee5be2502c5f2/prometheus/histogram.go#L1296
spans = append(spans, prompb.BucketSpan{
Offset: delta,
Length: 0,
})
} else {
// We have found a small gap (or no gap at all).
// Insert empty buckets as needed.
for j := int32(0); j < delta; j++ {
appendDelta(0)
}
}
appendDelta(count)
nextBucketIdx = bucketIdx + 1
}
return spans, deltas
}

114
storage/remote/otlptranslator/prometheusremotewrite/metrics_to_prw.go

@ -0,0 +1,114 @@
// DO NOT EDIT. COPIED AS-IS. SEE README.md
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0
package prometheusremotewrite // import "github.com/prometheus/prometheus/storage/remote/otlptranslator/prometheusremotewrite"
import (
"errors"
"fmt"
"github.com/prometheus/prometheus/prompb"
"go.opentelemetry.io/collector/pdata/pcommon"
"go.opentelemetry.io/collector/pdata/pmetric"
"go.uber.org/multierr"
prometheustranslator "github.com/prometheus/prometheus/storage/remote/otlptranslator/prometheus"
)
type Settings struct {
Namespace string
ExternalLabels map[string]string
DisableTargetInfo bool
ExportCreatedMetric bool
}
// FromMetrics converts pmetric.Metrics to prometheus remote write format.
func FromMetrics(md pmetric.Metrics, settings Settings) (tsMap map[string]*prompb.TimeSeries, errs error) {
tsMap = make(map[string]*prompb.TimeSeries)
resourceMetricsSlice := md.ResourceMetrics()
for i := 0; i < resourceMetricsSlice.Len(); i++ {
resourceMetrics := resourceMetricsSlice.At(i)
resource := resourceMetrics.Resource()
scopeMetricsSlice := resourceMetrics.ScopeMetrics()
// keep track of the most recent timestamp in the ResourceMetrics for
// use with the "target" info metric
var mostRecentTimestamp pcommon.Timestamp
for j := 0; j < scopeMetricsSlice.Len(); j++ {
scopeMetrics := scopeMetricsSlice.At(j)
metricSlice := scopeMetrics.Metrics()
// TODO: decide if instrumentation library information should be exported as labels
for k := 0; k < metricSlice.Len(); k++ {
metric := metricSlice.At(k)
mostRecentTimestamp = maxTimestamp(mostRecentTimestamp, mostRecentTimestampInMetric(metric))
if !isValidAggregationTemporality(metric) {
errs = multierr.Append(errs, fmt.Errorf("invalid temporality and type combination for metric %q", metric.Name()))
continue
}
// handle individual metric based on type
switch metric.Type() {
case pmetric.MetricTypeGauge:
dataPoints := metric.Gauge().DataPoints()
if dataPoints.Len() == 0 {
errs = multierr.Append(errs, fmt.Errorf("empty data points. %s is dropped", metric.Name()))
}
for x := 0; x < dataPoints.Len(); x++ {
addSingleGaugeNumberDataPoint(dataPoints.At(x), resource, metric, settings, tsMap)
}
case pmetric.MetricTypeSum:
dataPoints := metric.Sum().DataPoints()
if dataPoints.Len() == 0 {
errs = multierr.Append(errs, fmt.Errorf("empty data points. %s is dropped", metric.Name()))
}
for x := 0; x < dataPoints.Len(); x++ {
addSingleSumNumberDataPoint(dataPoints.At(x), resource, metric, settings, tsMap)
}
case pmetric.MetricTypeHistogram:
dataPoints := metric.Histogram().DataPoints()
if dataPoints.Len() == 0 {
errs = multierr.Append(errs, fmt.Errorf("empty data points. %s is dropped", metric.Name()))
}
for x := 0; x < dataPoints.Len(); x++ {
addSingleHistogramDataPoint(dataPoints.At(x), resource, metric, settings, tsMap)
}
case pmetric.MetricTypeExponentialHistogram:
dataPoints := metric.ExponentialHistogram().DataPoints()
if dataPoints.Len() == 0 {
errs = multierr.Append(errs, fmt.Errorf("empty data points. %s is dropped", metric.Name()))
}
name := prometheustranslator.BuildPromCompliantName(metric, settings.Namespace)
for x := 0; x < dataPoints.Len(); x++ {
errs = multierr.Append(
errs,
addSingleExponentialHistogramDataPoint(
name,
dataPoints.At(x),
resource,
settings,
tsMap,
),
)
}
case pmetric.MetricTypeSummary:
dataPoints := metric.Summary().DataPoints()
if dataPoints.Len() == 0 {
errs = multierr.Append(errs, fmt.Errorf("empty data points. %s is dropped", metric.Name()))
}
for x := 0; x < dataPoints.Len(); x++ {
addSingleSummaryDataPoint(dataPoints.At(x), resource, metric, settings, tsMap)
}
default:
errs = multierr.Append(errs, errors.New("unsupported metric type"))
}
}
}
addResourceTargetInfo(resource, settings, mostRecentTimestamp, tsMap)
}
return
}

104
storage/remote/otlptranslator/prometheusremotewrite/number_data_points.go

@ -0,0 +1,104 @@
// DO NOT EDIT. COPIED AS-IS. SEE README.md
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0
package prometheusremotewrite // import "github.com/prometheus/prometheus/storage/remote/otlptranslator/prometheusremotewrite"
import (
"math"
"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/model/value"
"github.com/prometheus/prometheus/prompb"
"go.opentelemetry.io/collector/pdata/pcommon"
"go.opentelemetry.io/collector/pdata/pmetric"
prometheustranslator "github.com/prometheus/prometheus/storage/remote/otlptranslator/prometheus"
)
// addSingleSumNumberDataPoint converts the Gauge metric data point to a
// Prometheus time series with samples and labels. The result is stored in the
// series map.
func addSingleGaugeNumberDataPoint(
pt pmetric.NumberDataPoint,
resource pcommon.Resource,
metric pmetric.Metric,
settings Settings,
series map[string]*prompb.TimeSeries,
) {
name := prometheustranslator.BuildPromCompliantName(metric, settings.Namespace)
labels := createAttributes(
resource,
pt.Attributes(),
settings.ExternalLabels,
model.MetricNameLabel, name,
)
sample := &prompb.Sample{
// convert ns to ms
Timestamp: convertTimeStamp(pt.Timestamp()),
}
switch pt.ValueType() {
case pmetric.NumberDataPointValueTypeInt:
sample.Value = float64(pt.IntValue())
case pmetric.NumberDataPointValueTypeDouble:
sample.Value = pt.DoubleValue()
}
if pt.Flags().NoRecordedValue() {
sample.Value = math.Float64frombits(value.StaleNaN)
}
addSample(series, sample, labels, metric.Type().String())
}
// addSingleSumNumberDataPoint converts the Sum metric data point to a Prometheus
// time series with samples, labels and exemplars. The result is stored in the
// series map.
func addSingleSumNumberDataPoint(
pt pmetric.NumberDataPoint,
resource pcommon.Resource,
metric pmetric.Metric,
settings Settings,
series map[string]*prompb.TimeSeries,
) {
name := prometheustranslator.BuildPromCompliantName(metric, settings.Namespace)
labels := createAttributes(
resource,
pt.Attributes(),
settings.ExternalLabels,
model.MetricNameLabel, name,
)
sample := &prompb.Sample{
// convert ns to ms
Timestamp: convertTimeStamp(pt.Timestamp()),
}
switch pt.ValueType() {
case pmetric.NumberDataPointValueTypeInt:
sample.Value = float64(pt.IntValue())
case pmetric.NumberDataPointValueTypeDouble:
sample.Value = pt.DoubleValue()
}
if pt.Flags().NoRecordedValue() {
sample.Value = math.Float64frombits(value.StaleNaN)
}
sig := addSample(series, sample, labels, metric.Type().String())
if ts, ok := series[sig]; sig != "" && ok {
exemplars := getPromExemplars[pmetric.NumberDataPoint](pt)
ts.Exemplars = append(ts.Exemplars, exemplars...)
}
// add _created time series if needed
if settings.ExportCreatedMetric && metric.Sum().IsMonotonic() {
startTimestamp := pt.StartTimestamp()
if startTimestamp != 0 {
createdLabels := createAttributes(
resource,
pt.Attributes(),
settings.ExternalLabels,
nameStr,
name+createdSuffix,
)
addCreatedTimeSeriesIfNeeded(series, createdLabels, startTimestamp, metric.Type().String())
}
}
}

15
storage/remote/otlptranslator/update-copy.sh

@ -0,0 +1,15 @@
#!/bin/bash
OTEL_VERSION=v0.81.0
git clone https://github.com/open-telemetry/opentelemetry-collector-contrib ./tmp
cd ./tmp
git checkout $OTEL_VERSION
cd ..
rm -rf ./prometheusremotewrite/*
cp -r ./tmp/pkg/translator/prometheusremotewrite/*.go ./prometheusremotewrite
rm -rf ./prometheusremotewrite/*_test.go
rm -rf ./tmp
sed -i '' 's#github.com/open-telemetry/opentelemetry-collector-contrib/pkg/translator/prometheus#github.com/prometheus/prometheus/storage/remote/otlptranslator/prometheus#g' ./prometheusremotewrite/*.go
sed -i '' '1s#^#// DO NOT EDIT. COPIED AS-IS. SEE README.md\n\n#g' ./prometheusremotewrite/*.go

58
storage/remote/write_handler.go

@ -27,6 +27,7 @@ import (
"github.com/prometheus/prometheus/model/exemplar"
"github.com/prometheus/prometheus/prompb"
"github.com/prometheus/prometheus/storage"
otlptranslator "github.com/prometheus/prometheus/storage/remote/otlptranslator/prometheusremotewrite"
)
type writeHandler struct {
@ -178,3 +179,60 @@ func (h *writeHandler) write(ctx context.Context, req *prompb.WriteRequest) (err
return nil
}
// NewOTLPWriteHandler creates a http.Handler that accepts OTLP write requests and
// writes them to the provided appendable.
func NewOTLPWriteHandler(logger log.Logger, appendable storage.Appendable) http.Handler {
rwHandler := &writeHandler{
logger: logger,
appendable: appendable,
}
return &otlpWriteHandler{
logger: logger,
rwHandler: rwHandler,
}
}
type otlpWriteHandler struct {
logger log.Logger
rwHandler *writeHandler
}
func (h *otlpWriteHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
req, err := DecodeOTLPWriteRequest(r)
if err != nil {
level.Error(h.logger).Log("msg", "Error decoding remote write request", "err", err.Error())
http.Error(w, err.Error(), http.StatusBadRequest)
return
}
prwMetricsMap, errs := otlptranslator.FromMetrics(req.Metrics(), otlptranslator.Settings{})
if errs != nil {
level.Warn(h.logger).Log("msg", "Error translating OTLP metrics to Prometheus write request", "err", errs)
}
prwMetrics := make([]prompb.TimeSeries, 0, len(prwMetricsMap))
for _, ts := range prwMetricsMap {
prwMetrics = append(prwMetrics, *ts)
}
err = h.rwHandler.write(r.Context(), &prompb.WriteRequest{
Timeseries: prwMetrics,
})
switch err {
case nil:
case storage.ErrOutOfOrderSample, storage.ErrOutOfBounds, storage.ErrDuplicateSampleForTimestamp:
// Indicated an out of order sample is a bad request to prevent retries.
http.Error(w, err.Error(), http.StatusBadRequest)
return
default:
level.Error(h.logger).Log("msg", "Error appending remote write", "err", err.Error())
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
w.WriteHeader(http.StatusOK)
}

110
storage/remote/write_test.go

@ -14,6 +14,9 @@
package remote
import (
"bytes"
"net/http"
"net/http/httptest"
"net/url"
"testing"
"time"
@ -22,6 +25,9 @@ import (
common_config "github.com/prometheus/common/config"
"github.com/prometheus/common/model"
"github.com/stretchr/testify/require"
"go.opentelemetry.io/collector/pdata/pcommon"
"go.opentelemetry.io/collector/pdata/pmetric"
"go.opentelemetry.io/collector/pdata/pmetric/pmetricotlp"
"github.com/prometheus/prometheus/config"
"github.com/prometheus/prometheus/model/labels"
@ -373,3 +379,107 @@ func TestWriteStorageApplyConfigsPartialUpdate(t *testing.T) {
err = s.Close()
require.NoError(t, err)
}
func TestOTLPWriteHandler(t *testing.T) {
exportRequest := generateOTLPWriteRequest(t)
buf, err := exportRequest.MarshalProto()
require.NoError(t, err)
req, err := http.NewRequest("", "", bytes.NewReader(buf))
require.NoError(t, err)
req.Header.Set("Content-Type", "application/x-protobuf")
appendable := &mockAppendable{}
handler := NewOTLPWriteHandler(nil, appendable)
recorder := httptest.NewRecorder()
handler.ServeHTTP(recorder, req)
resp := recorder.Result()
require.Equal(t, http.StatusOK, resp.StatusCode)
require.Equal(t, 12, len(appendable.samples)) // 1 (counter) + 1 (gauge) + 1 (target_info) + 7 (hist_bucket) + 2 (hist_sum, hist_count)
require.Equal(t, 1, len(appendable.histograms)) // 1 (exponential histogram)
require.Equal(t, 1, len(appendable.exemplars)) // 1 (exemplar)
}
func generateOTLPWriteRequest(t *testing.T) pmetricotlp.ExportRequest {
d := pmetric.NewMetrics()
// Generate One Counter, One Gauge, One Histogram, One Exponential-Histogram
// with resource attributes: service.name="test-service", service.instance.id="test-instance", host.name="test-host"
// with metric attibute: foo.bar="baz"
timestamp := time.Now()
resourceMetric := d.ResourceMetrics().AppendEmpty()
resourceMetric.Resource().Attributes().PutStr("service.name", "test-service")
resourceMetric.Resource().Attributes().PutStr("service.instance.id", "test-instance")
resourceMetric.Resource().Attributes().PutStr("host.name", "test-host")
scopeMetric := resourceMetric.ScopeMetrics().AppendEmpty()
// Generate One Counter
counterMetric := scopeMetric.Metrics().AppendEmpty()
counterMetric.SetName("test-counter")
counterMetric.SetDescription("test-counter-description")
counterMetric.SetEmptySum()
counterMetric.Sum().SetAggregationTemporality(pmetric.AggregationTemporalityCumulative)
counterMetric.Sum().SetIsMonotonic(true)
counterDataPoint := counterMetric.Sum().DataPoints().AppendEmpty()
counterDataPoint.SetTimestamp(pcommon.NewTimestampFromTime(timestamp))
counterDataPoint.SetDoubleValue(10.0)
counterDataPoint.Attributes().PutStr("foo.bar", "baz")
counterExemplar := counterDataPoint.Exemplars().AppendEmpty()
counterExemplar.SetTimestamp(pcommon.NewTimestampFromTime(timestamp))
counterExemplar.SetDoubleValue(10.0)
counterExemplar.SetSpanID(pcommon.SpanID{0, 1, 2, 3, 4, 5, 6, 7})
counterExemplar.SetTraceID(pcommon.TraceID{0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15})
// Generate One Gauge
gaugeMetric := scopeMetric.Metrics().AppendEmpty()
gaugeMetric.SetName("test-gauge")
gaugeMetric.SetDescription("test-gauge-description")
gaugeMetric.SetEmptyGauge()
gaugeDataPoint := gaugeMetric.Gauge().DataPoints().AppendEmpty()
gaugeDataPoint.SetTimestamp(pcommon.NewTimestampFromTime(timestamp))
gaugeDataPoint.SetDoubleValue(10.0)
gaugeDataPoint.Attributes().PutStr("foo.bar", "baz")
// Generate One Histogram
histogramMetric := scopeMetric.Metrics().AppendEmpty()
histogramMetric.SetName("test-histogram")
histogramMetric.SetDescription("test-histogram-description")
histogramMetric.SetEmptyHistogram()
histogramMetric.Histogram().SetAggregationTemporality(pmetric.AggregationTemporalityCumulative)
histogramDataPoint := histogramMetric.Histogram().DataPoints().AppendEmpty()
histogramDataPoint.SetTimestamp(pcommon.NewTimestampFromTime(timestamp))
histogramDataPoint.ExplicitBounds().FromRaw([]float64{0.0, 1.0, 2.0, 3.0, 4.0, 5.0})
histogramDataPoint.BucketCounts().FromRaw([]uint64{2, 2, 2, 2, 2, 2})
histogramDataPoint.SetCount(10)
histogramDataPoint.SetSum(30.0)
histogramDataPoint.Attributes().PutStr("foo.bar", "baz")
// Generate One Exponential-Histogram
exponentialHistogramMetric := scopeMetric.Metrics().AppendEmpty()
exponentialHistogramMetric.SetName("test-exponential-histogram")
exponentialHistogramMetric.SetDescription("test-exponential-histogram-description")
exponentialHistogramMetric.SetEmptyExponentialHistogram()
exponentialHistogramMetric.ExponentialHistogram().SetAggregationTemporality(pmetric.AggregationTemporalityCumulative)
exponentialHistogramDataPoint := exponentialHistogramMetric.ExponentialHistogram().DataPoints().AppendEmpty()
exponentialHistogramDataPoint.SetTimestamp(pcommon.NewTimestampFromTime(timestamp))
exponentialHistogramDataPoint.SetScale(2.0)
exponentialHistogramDataPoint.Positive().BucketCounts().FromRaw([]uint64{2, 2, 2, 2, 2})
exponentialHistogramDataPoint.SetZeroCount(2)
exponentialHistogramDataPoint.SetCount(10)
exponentialHistogramDataPoint.SetSum(30.0)
exponentialHistogramDataPoint.Attributes().PutStr("foo.bar", "baz")
return pmetricotlp.NewExportRequestFromMetrics(d)
}

21
web/api/v1/api.go

@ -217,6 +217,7 @@ type API struct {
remoteWriteHandler http.Handler
remoteReadHandler http.Handler
otlpWriteHandler http.Handler
codecs []Codec
}
@ -249,6 +250,8 @@ func NewAPI(
gatherer prometheus.Gatherer,
registerer prometheus.Registerer,
statsRenderer StatsRenderer,
rwEnabled bool,
otlpEnabled bool,
) *API {
a := &API{
QueryEngine: qe,
@ -285,9 +288,16 @@ func NewAPI(
a.statsRenderer = statsRenderer
}
if ap != nil {
if ap == nil && (rwEnabled || otlpEnabled) {
panic("remote write or otlp write enabled, but no appender passed in.")
}
if rwEnabled {
a.remoteWriteHandler = remote.NewWriteHandler(logger, registerer, ap)
}
if otlpEnabled {
a.otlpWriteHandler = remote.NewOTLPWriteHandler(logger, ap)
}
return a
}
@ -380,6 +390,7 @@ func (api *API) Register(r *route.Router) {
r.Get("/status/walreplay", api.serveWALReplayStatus)
r.Post("/read", api.ready(api.remoteRead))
r.Post("/write", api.ready(api.remoteWrite))
r.Post("/otlp/v1/metrics", api.ready(api.otlpWrite))
r.Get("/alerts", wrapAgent(api.alerts))
r.Get("/rules", wrapAgent(api.rules))
@ -1581,6 +1592,14 @@ func (api *API) remoteWrite(w http.ResponseWriter, r *http.Request) {
}
}
func (api *API) otlpWrite(w http.ResponseWriter, r *http.Request) {
if api.otlpWriteHandler != nil {
api.otlpWriteHandler.ServeHTTP(w, r)
} else {
http.Error(w, "otlp write receiver needs to be enabled with --enable-feature=otlp-write-receiver", http.StatusNotFound)
}
}
func (api *API) deleteSeries(r *http.Request) apiFuncResult {
if !api.enableAdmin {
return apiFuncResult{nil, &apiError{errorUnavailable, errors.New("admin APIs disabled")}, nil, nil}

2
web/api/v1/errors_test.go

@ -134,6 +134,8 @@ func createPrometheusAPI(q storage.SampleAndChunkQueryable) *route.Router {
prometheus.DefaultGatherer,
nil,
nil,
false,
false,
)
promRouter := route.New().WithPrefix("/api/v1")

5
web/web.go

@ -259,6 +259,7 @@ type Options struct {
RemoteReadConcurrencyLimit int
RemoteReadBytesInFrame int
EnableRemoteWriteReceiver bool
EnableOTLPWriteReceiver bool
IsAgent bool
AppName string
@ -317,7 +318,7 @@ func New(logger log.Logger, o *Options) *Handler {
FactoryRr := func(_ context.Context) api_v1.RulesRetriever { return h.ruleManager }
var app storage.Appendable
if o.EnableRemoteWriteReceiver {
if o.EnableRemoteWriteReceiver || o.EnableOTLPWriteReceiver {
app = h.storage
}
@ -349,6 +350,8 @@ func New(logger log.Logger, o *Options) *Handler {
o.Gatherer,
o.Registerer,
nil,
o.EnableRemoteWriteReceiver,
o.EnableOTLPWriteReceiver,
)
if o.RoutePrefix != "/" {

Loading…
Cancel
Save