// Copyright 2024 The Prometheus Authors // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. // Provenance-includes-location: https://github.com/open-telemetry/opentelemetry-collector-contrib/blob/95e8f8fdc2a9dc87230406c9a3cf02be4fd68bea/pkg/translator/prometheusremotewrite/metrics_to_prw.go // Provenance-includes-license: Apache-2.0 // Provenance-includes-copyright: Copyright The OpenTelemetry Authors. package prometheusremotewrite import ( "errors" "fmt" "sort" "go.opentelemetry.io/collector/pdata/pcommon" "go.opentelemetry.io/collector/pdata/pmetric" "go.uber.org/multierr" "github.com/prometheus/prometheus/prompb" prometheustranslator "github.com/prometheus/prometheus/storage/remote/otlptranslator/prometheus" ) type Settings struct { Namespace string ExternalLabels map[string]string DisableTargetInfo bool ExportCreatedMetric bool AddMetricSuffixes bool SendMetadata bool PromoteResourceAttributes []string } // PrometheusConverter converts from OTel write format to Prometheus remote write format. type PrometheusConverter struct { unique map[uint64]*prompb.TimeSeries conflicts map[uint64][]*prompb.TimeSeries } func NewPrometheusConverter() *PrometheusConverter { return &PrometheusConverter{ unique: map[uint64]*prompb.TimeSeries{}, conflicts: map[uint64][]*prompb.TimeSeries{}, } } // FromMetrics converts pmetric.Metrics to Prometheus remote write format. func (c *PrometheusConverter) FromMetrics(md pmetric.Metrics, settings Settings) (errs error) { resourceMetricsSlice := md.ResourceMetrics() for i := 0; i < resourceMetricsSlice.Len(); i++ { resourceMetrics := resourceMetricsSlice.At(i) resource := resourceMetrics.Resource() scopeMetricsSlice := resourceMetrics.ScopeMetrics() // keep track of the most recent timestamp in the ResourceMetrics for // use with the "target" info metric var mostRecentTimestamp pcommon.Timestamp for j := 0; j < scopeMetricsSlice.Len(); j++ { metricSlice := scopeMetricsSlice.At(j).Metrics() // TODO: decide if instrumentation library information should be exported as labels for k := 0; k < metricSlice.Len(); k++ { metric := metricSlice.At(k) mostRecentTimestamp = max(mostRecentTimestamp, mostRecentTimestampInMetric(metric)) if !isValidAggregationTemporality(metric) { errs = multierr.Append(errs, fmt.Errorf("invalid temporality and type combination for metric %q", metric.Name())) continue } promName := prometheustranslator.BuildCompliantName(metric, settings.Namespace, settings.AddMetricSuffixes) // handle individual metrics based on type //exhaustive:enforce switch metric.Type() { case pmetric.MetricTypeGauge: dataPoints := metric.Gauge().DataPoints() if dataPoints.Len() == 0 { errs = multierr.Append(errs, fmt.Errorf("empty data points. %s is dropped", metric.Name())) break } c.addGaugeNumberDataPoints(dataPoints, resource, settings, promName) case pmetric.MetricTypeSum: dataPoints := metric.Sum().DataPoints() if dataPoints.Len() == 0 { errs = multierr.Append(errs, fmt.Errorf("empty data points. %s is dropped", metric.Name())) break } c.addSumNumberDataPoints(dataPoints, resource, metric, settings, promName) case pmetric.MetricTypeHistogram: dataPoints := metric.Histogram().DataPoints() if dataPoints.Len() == 0 { errs = multierr.Append(errs, fmt.Errorf("empty data points. %s is dropped", metric.Name())) break } c.addHistogramDataPoints(dataPoints, resource, settings, promName) case pmetric.MetricTypeExponentialHistogram: dataPoints := metric.ExponentialHistogram().DataPoints() if dataPoints.Len() == 0 { errs = multierr.Append(errs, fmt.Errorf("empty data points. %s is dropped", metric.Name())) break } errs = multierr.Append(errs, c.addExponentialHistogramDataPoints( dataPoints, resource, settings, promName, )) case pmetric.MetricTypeSummary: dataPoints := metric.Summary().DataPoints() if dataPoints.Len() == 0 { errs = multierr.Append(errs, fmt.Errorf("empty data points. %s is dropped", metric.Name())) break } c.addSummaryDataPoints(dataPoints, resource, settings, promName) default: errs = multierr.Append(errs, errors.New("unsupported metric type")) } } } addResourceTargetInfo(resource, settings, mostRecentTimestamp, c) } return } func isSameMetric(ts *prompb.TimeSeries, lbls []prompb.Label) bool { if len(ts.Labels) != len(lbls) { return false } for i, l := range ts.Labels { if l.Name != ts.Labels[i].Name || l.Value != ts.Labels[i].Value { return false } } return true } // addExemplars adds exemplars for the dataPoint. For each exemplar, if it can find a bucket bound corresponding to its value, // the exemplar is added to the bucket bound's time series, provided that the time series' has samples. func (c *PrometheusConverter) addExemplars(dataPoint pmetric.HistogramDataPoint, bucketBounds []bucketBoundsData) { if len(bucketBounds) == 0 { return } exemplars := getPromExemplars(dataPoint) if len(exemplars) == 0 { return } sort.Sort(byBucketBoundsData(bucketBounds)) for _, exemplar := range exemplars { for _, bound := range bucketBounds { if len(bound.ts.Samples) > 0 && exemplar.Value <= bound.bound { bound.ts.Exemplars = append(bound.ts.Exemplars, exemplar) break } } } } // addSample finds a TimeSeries that corresponds to lbls, and adds sample to it. // If there is no corresponding TimeSeries already, it's created. // The corresponding TimeSeries is returned. // If either lbls is nil/empty or sample is nil, nothing is done. func (c *PrometheusConverter) addSample(sample *prompb.Sample, lbls []prompb.Label) *prompb.TimeSeries { if sample == nil || len(lbls) == 0 { // This shouldn't happen return nil } ts, _ := c.getOrCreateTimeSeries(lbls) ts.Samples = append(ts.Samples, *sample) return ts }