diff --git a/CHANGELOG.md b/CHANGELOG.md index 37cbea6ef..ff222790f 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -4,6 +4,7 @@ * [FEATURE] OTLP receiver: Add new option `otlp.promote_resource_attributes`, for any OTel resource attributes that should be promoted to metric labels. #14200 * [ENHANCEMENT] OTLP receiver: Warn when encountering exponential histograms with zero count and non-zero sum. #14706 +* [ENHANCEMENT] OTLP receiver: Interrupt translation on context cancellation/timeout. #14612 * [BUGFIX] tsdb/wlog.Watcher.readSegmentForGC: Only count unknown record types against record_decode_failures_total metric. #14042 ## 2.54.1 / 2024-08-27 diff --git a/promql/engine.go b/promql/engine.go index dd855c6d2..e55f154d2 100644 --- a/promql/engine.go +++ b/promql/engine.go @@ -435,6 +435,10 @@ func NewEngine(opts EngineOpts) *Engine { // Close closes ng. func (ng *Engine) Close() error { + if ng == nil { + return nil + } + if ng.activeQueryTracker != nil { return ng.activeQueryTracker.Close() } diff --git a/promql/engine_test.go b/promql/engine_test.go index 947c0e1ed..db399d865 100644 --- a/promql/engine_test.go +++ b/promql/engine_test.go @@ -3019,6 +3019,29 @@ func TestEngineOptsValidation(t *testing.T) { } } +func TestEngine_Close(t *testing.T) { + t.Run("nil engine", func(t *testing.T) { + var ng *promql.Engine + require.NoError(t, ng.Close()) + }) + + t.Run("non-nil engine", func(t *testing.T) { + ng := promql.NewEngine(promql.EngineOpts{ + Logger: nil, + Reg: nil, + MaxSamples: 0, + Timeout: 100 * time.Second, + NoStepSubqueryIntervalFn: nil, + EnableAtModifier: true, + EnableNegativeOffset: true, + EnablePerStepStats: false, + LookbackDelta: 0, + EnableDelayedNameRemoval: true, + }) + require.NoError(t, ng.Close()) + }) +} + func TestInstantQueryWithRangeVectorSelector(t *testing.T) { engine := newTestEngine(t) diff --git a/storage/remote/otlptranslator/prometheusremotewrite/context.go b/storage/remote/otlptranslator/prometheusremotewrite/context.go new file mode 100644 index 000000000..5c6dd20f1 --- /dev/null +++ b/storage/remote/otlptranslator/prometheusremotewrite/context.go @@ -0,0 +1,37 @@ +// Copyright 2024 The Prometheus Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +package prometheusremotewrite + +import "context" + +// everyNTimes supports checking for context error every n times. +type everyNTimes struct { + n int + i int + err error +} + +// checkContext calls ctx.Err() every e.n times and returns an eventual error. +func (e *everyNTimes) checkContext(ctx context.Context) error { + if e.err != nil { + return e.err + } + + e.i++ + if e.i >= e.n { + e.i = 0 + e.err = ctx.Err() + } + + return e.err +} diff --git a/storage/remote/otlptranslator/prometheusremotewrite/context_test.go b/storage/remote/otlptranslator/prometheusremotewrite/context_test.go new file mode 100644 index 000000000..94b23be04 --- /dev/null +++ b/storage/remote/otlptranslator/prometheusremotewrite/context_test.go @@ -0,0 +1,40 @@ +// Copyright 2024 The Prometheus Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +package prometheusremotewrite + +import ( + "context" + "testing" + + "github.com/stretchr/testify/require" +) + +func TestEveryNTimes(t *testing.T) { + const n = 128 + ctx, cancel := context.WithCancel(context.Background()) + e := &everyNTimes{ + n: n, + } + + for i := 0; i < n; i++ { + require.NoError(t, e.checkContext(ctx)) + } + + cancel() + for i := 0; i < n-1; i++ { + require.NoError(t, e.checkContext(ctx)) + } + require.EqualError(t, e.checkContext(ctx), context.Canceled.Error()) + // e should remember the error. + require.EqualError(t, e.checkContext(ctx), context.Canceled.Error()) +} diff --git a/storage/remote/otlptranslator/prometheusremotewrite/helper.go b/storage/remote/otlptranslator/prometheusremotewrite/helper.go index 67cf28119..fd7f58f07 100644 --- a/storage/remote/otlptranslator/prometheusremotewrite/helper.go +++ b/storage/remote/otlptranslator/prometheusremotewrite/helper.go @@ -17,6 +17,7 @@ package prometheusremotewrite import ( + "context" "encoding/hex" "fmt" "log" @@ -241,9 +242,13 @@ func isValidAggregationTemporality(metric pmetric.Metric) bool { // with the user defined bucket boundaries of non-exponential OTel histograms. // However, work is under way to resolve this shortcoming through a feature called native histograms custom buckets: // https://github.com/prometheus/prometheus/issues/13485. -func (c *PrometheusConverter) addHistogramDataPoints(dataPoints pmetric.HistogramDataPointSlice, - resource pcommon.Resource, settings Settings, baseName string) { +func (c *PrometheusConverter) addHistogramDataPoints(ctx context.Context, dataPoints pmetric.HistogramDataPointSlice, + resource pcommon.Resource, settings Settings, baseName string) error { for x := 0; x < dataPoints.Len(); x++ { + if err := c.everyN.checkContext(ctx); err != nil { + return err + } + pt := dataPoints.At(x) timestamp := convertTimeStamp(pt.Timestamp()) baseLabels := createAttributes(resource, pt.Attributes(), settings, nil, false) @@ -284,6 +289,10 @@ func (c *PrometheusConverter) addHistogramDataPoints(dataPoints pmetric.Histogra // process each bound, based on histograms proto definition, # of buckets = # of explicit bounds + 1 for i := 0; i < pt.ExplicitBounds().Len() && i < pt.BucketCounts().Len(); i++ { + if err := c.everyN.checkContext(ctx); err != nil { + return err + } + bound := pt.ExplicitBounds().At(i) cumulativeCount += pt.BucketCounts().At(i) bucket := &prompb.Sample{ @@ -312,7 +321,9 @@ func (c *PrometheusConverter) addHistogramDataPoints(dataPoints pmetric.Histogra ts := c.addSample(infBucket, infLabels) bucketBounds = append(bucketBounds, bucketBoundsData{ts: ts, bound: math.Inf(1)}) - c.addExemplars(pt, bucketBounds) + if err := c.addExemplars(ctx, pt, bucketBounds); err != nil { + return err + } startTimestamp := pt.StartTimestamp() if settings.ExportCreatedMetric && startTimestamp != 0 { @@ -320,6 +331,8 @@ func (c *PrometheusConverter) addHistogramDataPoints(dataPoints pmetric.Histogra c.addTimeSeriesIfNeeded(labels, startTimestamp, pt.Timestamp()) } } + + return nil } type exemplarType interface { @@ -327,9 +340,13 @@ type exemplarType interface { Exemplars() pmetric.ExemplarSlice } -func getPromExemplars[T exemplarType](pt T) []prompb.Exemplar { +func getPromExemplars[T exemplarType](ctx context.Context, everyN *everyNTimes, pt T) ([]prompb.Exemplar, error) { promExemplars := make([]prompb.Exemplar, 0, pt.Exemplars().Len()) for i := 0; i < pt.Exemplars().Len(); i++ { + if err := everyN.checkContext(ctx); err != nil { + return nil, err + } + exemplar := pt.Exemplars().At(i) exemplarRunes := 0 @@ -379,7 +396,7 @@ func getPromExemplars[T exemplarType](pt T) []prompb.Exemplar { promExemplars = append(promExemplars, promExemplar) } - return promExemplars + return promExemplars, nil } // mostRecentTimestampInMetric returns the latest timestamp in a batch of metrics @@ -417,9 +434,13 @@ func mostRecentTimestampInMetric(metric pmetric.Metric) pcommon.Timestamp { return ts } -func (c *PrometheusConverter) addSummaryDataPoints(dataPoints pmetric.SummaryDataPointSlice, resource pcommon.Resource, - settings Settings, baseName string) { +func (c *PrometheusConverter) addSummaryDataPoints(ctx context.Context, dataPoints pmetric.SummaryDataPointSlice, resource pcommon.Resource, + settings Settings, baseName string) error { for x := 0; x < dataPoints.Len(); x++ { + if err := c.everyN.checkContext(ctx); err != nil { + return err + } + pt := dataPoints.At(x) timestamp := convertTimeStamp(pt.Timestamp()) baseLabels := createAttributes(resource, pt.Attributes(), settings, nil, false) @@ -468,6 +489,8 @@ func (c *PrometheusConverter) addSummaryDataPoints(dataPoints pmetric.SummaryDat c.addTimeSeriesIfNeeded(createdLabels, startTimestamp, pt.Timestamp()) } } + + return nil } // createLabels returns a copy of baseLabels, adding to it the pair model.MetricNameLabel=name. diff --git a/storage/remote/otlptranslator/prometheusremotewrite/helper_test.go b/storage/remote/otlptranslator/prometheusremotewrite/helper_test.go index e02ebbf5d..a48a57b06 100644 --- a/storage/remote/otlptranslator/prometheusremotewrite/helper_test.go +++ b/storage/remote/otlptranslator/prometheusremotewrite/helper_test.go @@ -17,6 +17,7 @@ package prometheusremotewrite import ( + "context" "testing" "time" @@ -280,6 +281,7 @@ func TestPrometheusConverter_AddSummaryDataPoints(t *testing.T) { converter := NewPrometheusConverter() converter.addSummaryDataPoints( + context.Background(), metric.Summary().DataPoints(), pcommon.NewResource(), Settings{ @@ -390,6 +392,7 @@ func TestPrometheusConverter_AddHistogramDataPoints(t *testing.T) { converter := NewPrometheusConverter() converter.addHistogramDataPoints( + context.Background(), metric.Histogram().DataPoints(), pcommon.NewResource(), Settings{ diff --git a/storage/remote/otlptranslator/prometheusremotewrite/histograms.go b/storage/remote/otlptranslator/prometheusremotewrite/histograms.go index ec93387fc..8349d4f90 100644 --- a/storage/remote/otlptranslator/prometheusremotewrite/histograms.go +++ b/storage/remote/otlptranslator/prometheusremotewrite/histograms.go @@ -17,6 +17,7 @@ package prometheusremotewrite import ( + "context" "fmt" "math" @@ -33,10 +34,14 @@ const defaultZeroThreshold = 1e-128 // addExponentialHistogramDataPoints adds OTel exponential histogram data points to the corresponding time series // as native histogram samples. -func (c *PrometheusConverter) addExponentialHistogramDataPoints(dataPoints pmetric.ExponentialHistogramDataPointSlice, +func (c *PrometheusConverter) addExponentialHistogramDataPoints(ctx context.Context, dataPoints pmetric.ExponentialHistogramDataPointSlice, resource pcommon.Resource, settings Settings, promName string) (annotations.Annotations, error) { var annots annotations.Annotations for x := 0; x < dataPoints.Len(); x++ { + if err := c.everyN.checkContext(ctx); err != nil { + return annots, err + } + pt := dataPoints.At(x) histogram, ws, err := exponentialToNativeHistogram(pt) @@ -57,15 +62,18 @@ func (c *PrometheusConverter) addExponentialHistogramDataPoints(dataPoints pmetr ts, _ := c.getOrCreateTimeSeries(lbls) ts.Histograms = append(ts.Histograms, histogram) - exemplars := getPromExemplars[pmetric.ExponentialHistogramDataPoint](pt) + exemplars, err := getPromExemplars[pmetric.ExponentialHistogramDataPoint](ctx, &c.everyN, pt) + if err != nil { + return annots, err + } ts.Exemplars = append(ts.Exemplars, exemplars...) } return annots, nil } -// exponentialToNativeHistogram translates OTel Exponential Histogram data point -// to Prometheus Native Histogram. +// exponentialToNativeHistogram translates an OTel Exponential Histogram data point +// to a Prometheus Native Histogram. func exponentialToNativeHistogram(p pmetric.ExponentialHistogramDataPoint) (prompb.Histogram, annotations.Annotations, error) { var annots annotations.Annotations scale := p.Scale() diff --git a/storage/remote/otlptranslator/prometheusremotewrite/histograms_test.go b/storage/remote/otlptranslator/prometheusremotewrite/histograms_test.go index cd1c858ac..e064ab28a 100644 --- a/storage/remote/otlptranslator/prometheusremotewrite/histograms_test.go +++ b/storage/remote/otlptranslator/prometheusremotewrite/histograms_test.go @@ -17,6 +17,7 @@ package prometheusremotewrite import ( + "context" "fmt" "testing" "time" @@ -754,6 +755,7 @@ func TestPrometheusConverter_addExponentialHistogramDataPoints(t *testing.T) { converter := NewPrometheusConverter() annots, err := converter.addExponentialHistogramDataPoints( + context.Background(), metric.ExponentialHistogram().DataPoints(), pcommon.NewResource(), Settings{ diff --git a/storage/remote/otlptranslator/prometheusremotewrite/metrics_to_prw.go b/storage/remote/otlptranslator/prometheusremotewrite/metrics_to_prw.go index 9d7680080..0afd2ad57 100644 --- a/storage/remote/otlptranslator/prometheusremotewrite/metrics_to_prw.go +++ b/storage/remote/otlptranslator/prometheusremotewrite/metrics_to_prw.go @@ -17,6 +17,7 @@ package prometheusremotewrite import ( + "context" "errors" "fmt" "sort" @@ -44,6 +45,7 @@ type Settings struct { type PrometheusConverter struct { unique map[uint64]*prompb.TimeSeries conflicts map[uint64][]*prompb.TimeSeries + everyN everyNTimes } func NewPrometheusConverter() *PrometheusConverter { @@ -54,7 +56,8 @@ func NewPrometheusConverter() *PrometheusConverter { } // FromMetrics converts pmetric.Metrics to Prometheus remote write format. -func (c *PrometheusConverter) FromMetrics(md pmetric.Metrics, settings Settings) (annots annotations.Annotations, errs error) { +func (c *PrometheusConverter) FromMetrics(ctx context.Context, md pmetric.Metrics, settings Settings) (annots annotations.Annotations, errs error) { + c.everyN = everyNTimes{n: 128} resourceMetricsSlice := md.ResourceMetrics() for i := 0; i < resourceMetricsSlice.Len(); i++ { resourceMetrics := resourceMetricsSlice.At(i) @@ -68,6 +71,11 @@ func (c *PrometheusConverter) FromMetrics(md pmetric.Metrics, settings Settings) // TODO: decide if instrumentation library information should be exported as labels for k := 0; k < metricSlice.Len(); k++ { + if err := c.everyN.checkContext(ctx); err != nil { + errs = multierr.Append(errs, err) + return + } + metric := metricSlice.At(k) mostRecentTimestamp = max(mostRecentTimestamp, mostRecentTimestampInMetric(metric)) @@ -87,21 +95,36 @@ func (c *PrometheusConverter) FromMetrics(md pmetric.Metrics, settings Settings) errs = multierr.Append(errs, fmt.Errorf("empty data points. %s is dropped", metric.Name())) break } - c.addGaugeNumberDataPoints(dataPoints, resource, settings, promName) + if err := c.addGaugeNumberDataPoints(ctx, dataPoints, resource, settings, promName); err != nil { + errs = multierr.Append(errs, err) + if errors.Is(err, context.Canceled) || errors.Is(err, context.DeadlineExceeded) { + return + } + } case pmetric.MetricTypeSum: dataPoints := metric.Sum().DataPoints() if dataPoints.Len() == 0 { errs = multierr.Append(errs, fmt.Errorf("empty data points. %s is dropped", metric.Name())) break } - c.addSumNumberDataPoints(dataPoints, resource, metric, settings, promName) + if err := c.addSumNumberDataPoints(ctx, dataPoints, resource, metric, settings, promName); err != nil { + errs = multierr.Append(errs, err) + if errors.Is(err, context.Canceled) || errors.Is(err, context.DeadlineExceeded) { + return + } + } case pmetric.MetricTypeHistogram: dataPoints := metric.Histogram().DataPoints() if dataPoints.Len() == 0 { errs = multierr.Append(errs, fmt.Errorf("empty data points. %s is dropped", metric.Name())) break } - c.addHistogramDataPoints(dataPoints, resource, settings, promName) + if err := c.addHistogramDataPoints(ctx, dataPoints, resource, settings, promName); err != nil { + errs = multierr.Append(errs, err) + if errors.Is(err, context.Canceled) || errors.Is(err, context.DeadlineExceeded) { + return + } + } case pmetric.MetricTypeExponentialHistogram: dataPoints := metric.ExponentialHistogram().DataPoints() if dataPoints.Len() == 0 { @@ -109,20 +132,31 @@ func (c *PrometheusConverter) FromMetrics(md pmetric.Metrics, settings Settings) break } ws, err := c.addExponentialHistogramDataPoints( + ctx, dataPoints, resource, settings, promName, ) annots.Merge(ws) - errs = multierr.Append(errs, err) + if err != nil { + errs = multierr.Append(errs, err) + if errors.Is(err, context.Canceled) || errors.Is(err, context.DeadlineExceeded) { + return + } + } case pmetric.MetricTypeSummary: dataPoints := metric.Summary().DataPoints() if dataPoints.Len() == 0 { errs = multierr.Append(errs, fmt.Errorf("empty data points. %s is dropped", metric.Name())) break } - c.addSummaryDataPoints(dataPoints, resource, settings, promName) + if err := c.addSummaryDataPoints(ctx, dataPoints, resource, settings, promName); err != nil { + errs = multierr.Append(errs, err) + if errors.Is(err, context.Canceled) || errors.Is(err, context.DeadlineExceeded) { + return + } + } default: errs = multierr.Append(errs, errors.New("unsupported metric type")) } @@ -148,25 +182,33 @@ func isSameMetric(ts *prompb.TimeSeries, lbls []prompb.Label) bool { // addExemplars adds exemplars for the dataPoint. For each exemplar, if it can find a bucket bound corresponding to its value, // the exemplar is added to the bucket bound's time series, provided that the time series' has samples. -func (c *PrometheusConverter) addExemplars(dataPoint pmetric.HistogramDataPoint, bucketBounds []bucketBoundsData) { +func (c *PrometheusConverter) addExemplars(ctx context.Context, dataPoint pmetric.HistogramDataPoint, bucketBounds []bucketBoundsData) error { if len(bucketBounds) == 0 { - return + return nil } - exemplars := getPromExemplars(dataPoint) + exemplars, err := getPromExemplars(ctx, &c.everyN, dataPoint) + if err != nil { + return err + } if len(exemplars) == 0 { - return + return nil } sort.Sort(byBucketBoundsData(bucketBounds)) for _, exemplar := range exemplars { for _, bound := range bucketBounds { + if err := c.everyN.checkContext(ctx); err != nil { + return err + } if len(bound.ts.Samples) > 0 && exemplar.Value <= bound.bound { bound.ts.Exemplars = append(bound.ts.Exemplars, exemplar) break } } } + + return nil } // addSample finds a TimeSeries that corresponds to lbls, and adds sample to it. diff --git a/storage/remote/otlptranslator/prometheusremotewrite/metrics_to_prw_test.go b/storage/remote/otlptranslator/prometheusremotewrite/metrics_to_prw_test.go index bdc1c9d0b..641437632 100644 --- a/storage/remote/otlptranslator/prometheusremotewrite/metrics_to_prw_test.go +++ b/storage/remote/otlptranslator/prometheusremotewrite/metrics_to_prw_test.go @@ -17,6 +17,7 @@ package prometheusremotewrite import ( + "context" "fmt" "testing" "time" @@ -28,6 +29,39 @@ import ( ) func TestFromMetrics(t *testing.T) { + t.Run("successful", func(t *testing.T) { + converter := NewPrometheusConverter() + payload := createExportRequest(5, 128, 128, 2, 0) + + annots, err := converter.FromMetrics(context.Background(), payload.Metrics(), Settings{}) + require.NoError(t, err) + require.Empty(t, annots) + }) + + t.Run("context cancellation", func(t *testing.T) { + converter := NewPrometheusConverter() + ctx, cancel := context.WithCancel(context.Background()) + // Verify that converter.FromMetrics respects cancellation. + cancel() + payload := createExportRequest(5, 128, 128, 2, 0) + + annots, err := converter.FromMetrics(ctx, payload.Metrics(), Settings{}) + require.ErrorIs(t, err, context.Canceled) + require.Empty(t, annots) + }) + + t.Run("context timeout", func(t *testing.T) { + converter := NewPrometheusConverter() + // Verify that converter.FromMetrics respects timeout. + ctx, cancel := context.WithTimeout(context.Background(), 0) + t.Cleanup(cancel) + payload := createExportRequest(5, 128, 128, 2, 0) + + annots, err := converter.FromMetrics(ctx, payload.Metrics(), Settings{}) + require.ErrorIs(t, err, context.DeadlineExceeded) + require.Empty(t, annots) + }) + t.Run("exponential histogram warnings for zero count and non-zero sum", func(t *testing.T) { request := pmetricotlp.NewExportRequest() rm := request.Metrics().ResourceMetrics().AppendEmpty() @@ -51,7 +85,7 @@ func TestFromMetrics(t *testing.T) { } converter := NewPrometheusConverter() - annots, err := converter.FromMetrics(request.Metrics(), Settings{}) + annots, err := converter.FromMetrics(context.Background(), request.Metrics(), Settings{}) require.NoError(t, err) require.NotEmpty(t, annots) ws, infos := annots.AsStrings("", 0, 0) @@ -84,7 +118,7 @@ func BenchmarkPrometheusConverter_FromMetrics(b *testing.B) { for i := 0; i < b.N; i++ { converter := NewPrometheusConverter() - annots, err := converter.FromMetrics(payload.Metrics(), Settings{}) + annots, err := converter.FromMetrics(context.Background(), payload.Metrics(), Settings{}) require.NoError(b, err) require.Empty(b, annots) require.NotNil(b, converter.TimeSeries()) diff --git a/storage/remote/otlptranslator/prometheusremotewrite/number_data_points.go b/storage/remote/otlptranslator/prometheusremotewrite/number_data_points.go index 80ccb46c7..6cdab450e 100644 --- a/storage/remote/otlptranslator/prometheusremotewrite/number_data_points.go +++ b/storage/remote/otlptranslator/prometheusremotewrite/number_data_points.go @@ -17,6 +17,7 @@ package prometheusremotewrite import ( + "context" "math" "github.com/prometheus/common/model" @@ -27,9 +28,13 @@ import ( "github.com/prometheus/prometheus/prompb" ) -func (c *PrometheusConverter) addGaugeNumberDataPoints(dataPoints pmetric.NumberDataPointSlice, - resource pcommon.Resource, settings Settings, name string) { +func (c *PrometheusConverter) addGaugeNumberDataPoints(ctx context.Context, dataPoints pmetric.NumberDataPointSlice, + resource pcommon.Resource, settings Settings, name string) error { for x := 0; x < dataPoints.Len(); x++ { + if err := c.everyN.checkContext(ctx); err != nil { + return err + } + pt := dataPoints.At(x) labels := createAttributes( resource, @@ -55,11 +60,17 @@ func (c *PrometheusConverter) addGaugeNumberDataPoints(dataPoints pmetric.Number } c.addSample(sample, labels) } + + return nil } -func (c *PrometheusConverter) addSumNumberDataPoints(dataPoints pmetric.NumberDataPointSlice, - resource pcommon.Resource, metric pmetric.Metric, settings Settings, name string) { +func (c *PrometheusConverter) addSumNumberDataPoints(ctx context.Context, dataPoints pmetric.NumberDataPointSlice, + resource pcommon.Resource, metric pmetric.Metric, settings Settings, name string) error { for x := 0; x < dataPoints.Len(); x++ { + if err := c.everyN.checkContext(ctx); err != nil { + return err + } + pt := dataPoints.At(x) lbls := createAttributes( resource, @@ -85,7 +96,10 @@ func (c *PrometheusConverter) addSumNumberDataPoints(dataPoints pmetric.NumberDa } ts := c.addSample(sample, lbls) if ts != nil { - exemplars := getPromExemplars[pmetric.NumberDataPoint](pt) + exemplars, err := getPromExemplars[pmetric.NumberDataPoint](ctx, &c.everyN, pt) + if err != nil { + return err + } ts.Exemplars = append(ts.Exemplars, exemplars...) } @@ -93,7 +107,7 @@ func (c *PrometheusConverter) addSumNumberDataPoints(dataPoints pmetric.NumberDa if settings.ExportCreatedMetric && metric.Sum().IsMonotonic() { startTimestamp := pt.StartTimestamp() if startTimestamp == 0 { - return + return nil } createdLabels := make([]prompb.Label, len(lbls)) @@ -107,4 +121,6 @@ func (c *PrometheusConverter) addSumNumberDataPoints(dataPoints pmetric.NumberDa c.addTimeSeriesIfNeeded(createdLabels, startTimestamp, pt.Timestamp()) } } + + return nil } diff --git a/storage/remote/otlptranslator/prometheusremotewrite/number_data_points_test.go b/storage/remote/otlptranslator/prometheusremotewrite/number_data_points_test.go index 41afc8c4c..e93226964 100644 --- a/storage/remote/otlptranslator/prometheusremotewrite/number_data_points_test.go +++ b/storage/remote/otlptranslator/prometheusremotewrite/number_data_points_test.go @@ -17,6 +17,7 @@ package prometheusremotewrite import ( + "context" "testing" "time" @@ -66,6 +67,7 @@ func TestPrometheusConverter_addGaugeNumberDataPoints(t *testing.T) { converter := NewPrometheusConverter() converter.addGaugeNumberDataPoints( + context.Background(), metric.Gauge().DataPoints(), pcommon.NewResource(), Settings{ @@ -242,6 +244,7 @@ func TestPrometheusConverter_addSumNumberDataPoints(t *testing.T) { converter := NewPrometheusConverter() converter.addSumNumberDataPoints( + context.Background(), metric.Sum().DataPoints(), pcommon.NewResource(), metric, diff --git a/storage/remote/write_handler.go b/storage/remote/write_handler.go index 58fb668cc..736bc8eff 100644 --- a/storage/remote/write_handler.go +++ b/storage/remote/write_handler.go @@ -512,7 +512,7 @@ func (h *otlpWriteHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { otlpCfg := h.configFunc().OTLPConfig converter := otlptranslator.NewPrometheusConverter() - annots, err := converter.FromMetrics(req.Metrics(), otlptranslator.Settings{ + annots, err := converter.FromMetrics(r.Context(), req.Metrics(), otlptranslator.Settings{ AddMetricSuffixes: true, PromoteResourceAttributes: otlpCfg.PromoteResourceAttributes, })