mirror of https://github.com/prometheus/prometheus
560 lines
18 KiB
Go
560 lines
18 KiB
Go
|
// DO NOT EDIT. COPIED AS-IS. SEE README.md
|
||
|
|
||
|
// Copyright The OpenTelemetry Authors
|
||
|
// SPDX-License-Identifier: Apache-2.0
|
||
|
|
||
|
package prometheusremotewrite // import "github.com/prometheus/prometheus/storage/remote/otlptranslator/prometheusremotewrite"
|
||
|
|
||
|
import (
|
||
|
"encoding/hex"
|
||
|
"fmt"
|
||
|
"log"
|
||
|
"math"
|
||
|
"sort"
|
||
|
"strconv"
|
||
|
"strings"
|
||
|
"time"
|
||
|
"unicode/utf8"
|
||
|
|
||
|
"github.com/prometheus/common/model"
|
||
|
"github.com/prometheus/prometheus/model/timestamp"
|
||
|
"github.com/prometheus/prometheus/model/value"
|
||
|
"github.com/prometheus/prometheus/prompb"
|
||
|
"go.opentelemetry.io/collector/pdata/pcommon"
|
||
|
"go.opentelemetry.io/collector/pdata/pmetric"
|
||
|
conventions "go.opentelemetry.io/collector/semconv/v1.6.1"
|
||
|
|
||
|
prometheustranslator "github.com/prometheus/prometheus/storage/remote/otlptranslator/prometheus"
|
||
|
)
|
||
|
|
||
|
const (
|
||
|
nameStr = "__name__"
|
||
|
sumStr = "_sum"
|
||
|
countStr = "_count"
|
||
|
bucketStr = "_bucket"
|
||
|
leStr = "le"
|
||
|
quantileStr = "quantile"
|
||
|
pInfStr = "+Inf"
|
||
|
createdSuffix = "_created"
|
||
|
// maxExemplarRunes is the maximum number of UTF-8 exemplar characters
|
||
|
// according to the prometheus specification
|
||
|
// https://github.com/OpenObservability/OpenMetrics/blob/main/specification/OpenMetrics.md#exemplars
|
||
|
maxExemplarRunes = 128
|
||
|
// Trace and Span id keys are defined as part of the spec:
|
||
|
// https://github.com/open-telemetry/opentelemetry-specification/blob/main/specification%2Fmetrics%2Fdatamodel.md#exemplars-2
|
||
|
traceIDKey = "trace_id"
|
||
|
spanIDKey = "span_id"
|
||
|
infoType = "info"
|
||
|
targetMetricName = "target_info"
|
||
|
)
|
||
|
|
||
|
type bucketBoundsData struct {
|
||
|
sig string
|
||
|
bound float64
|
||
|
}
|
||
|
|
||
|
// byBucketBoundsData enables the usage of sort.Sort() with a slice of bucket bounds
|
||
|
type byBucketBoundsData []bucketBoundsData
|
||
|
|
||
|
func (m byBucketBoundsData) Len() int { return len(m) }
|
||
|
func (m byBucketBoundsData) Less(i, j int) bool { return m[i].bound < m[j].bound }
|
||
|
func (m byBucketBoundsData) Swap(i, j int) { m[i], m[j] = m[j], m[i] }
|
||
|
|
||
|
// ByLabelName enables the usage of sort.Sort() with a slice of labels
|
||
|
type ByLabelName []prompb.Label
|
||
|
|
||
|
func (a ByLabelName) Len() int { return len(a) }
|
||
|
func (a ByLabelName) Less(i, j int) bool { return a[i].Name < a[j].Name }
|
||
|
func (a ByLabelName) Swap(i, j int) { a[i], a[j] = a[j], a[i] }
|
||
|
|
||
|
// addSample finds a TimeSeries in tsMap that corresponds to the label set labels, and add sample to the TimeSeries; it
|
||
|
// creates a new TimeSeries in the map if not found and returns the time series signature.
|
||
|
// tsMap will be unmodified if either labels or sample is nil, but can still be modified if the exemplar is nil.
|
||
|
func addSample(tsMap map[string]*prompb.TimeSeries, sample *prompb.Sample, labels []prompb.Label,
|
||
|
datatype string) string {
|
||
|
|
||
|
if sample == nil || labels == nil || tsMap == nil {
|
||
|
return ""
|
||
|
}
|
||
|
|
||
|
sig := timeSeriesSignature(datatype, &labels)
|
||
|
ts, ok := tsMap[sig]
|
||
|
|
||
|
if ok {
|
||
|
ts.Samples = append(ts.Samples, *sample)
|
||
|
} else {
|
||
|
newTs := &prompb.TimeSeries{
|
||
|
Labels: labels,
|
||
|
Samples: []prompb.Sample{*sample},
|
||
|
}
|
||
|
tsMap[sig] = newTs
|
||
|
}
|
||
|
|
||
|
return sig
|
||
|
}
|
||
|
|
||
|
// addExemplars finds a bucket bound that corresponds to the exemplars value and add the exemplar to the specific sig;
|
||
|
// we only add exemplars if samples are presents
|
||
|
// tsMap is unmodified if either of its parameters is nil and samples are nil.
|
||
|
func addExemplars(tsMap map[string]*prompb.TimeSeries, exemplars []prompb.Exemplar, bucketBoundsData []bucketBoundsData) {
|
||
|
if tsMap == nil || bucketBoundsData == nil || exemplars == nil {
|
||
|
return
|
||
|
}
|
||
|
|
||
|
sort.Sort(byBucketBoundsData(bucketBoundsData))
|
||
|
|
||
|
for _, exemplar := range exemplars {
|
||
|
addExemplar(tsMap, bucketBoundsData, exemplar)
|
||
|
}
|
||
|
}
|
||
|
|
||
|
func addExemplar(tsMap map[string]*prompb.TimeSeries, bucketBounds []bucketBoundsData, exemplar prompb.Exemplar) {
|
||
|
for _, bucketBound := range bucketBounds {
|
||
|
sig := bucketBound.sig
|
||
|
bound := bucketBound.bound
|
||
|
|
||
|
_, ok := tsMap[sig]
|
||
|
if ok {
|
||
|
if tsMap[sig].Samples != nil {
|
||
|
if exemplar.Value <= bound {
|
||
|
tsMap[sig].Exemplars = append(tsMap[sig].Exemplars, exemplar)
|
||
|
return
|
||
|
}
|
||
|
}
|
||
|
}
|
||
|
}
|
||
|
}
|
||
|
|
||
|
// timeSeries return a string signature in the form of:
|
||
|
//
|
||
|
// TYPE-label1-value1- ... -labelN-valueN
|
||
|
//
|
||
|
// the label slice should not contain duplicate label names; this method sorts the slice by label name before creating
|
||
|
// the signature.
|
||
|
func timeSeriesSignature(datatype string, labels *[]prompb.Label) string {
|
||
|
b := strings.Builder{}
|
||
|
b.WriteString(datatype)
|
||
|
|
||
|
sort.Sort(ByLabelName(*labels))
|
||
|
|
||
|
for _, lb := range *labels {
|
||
|
b.WriteString("-")
|
||
|
b.WriteString(lb.GetName())
|
||
|
b.WriteString("-")
|
||
|
b.WriteString(lb.GetValue())
|
||
|
}
|
||
|
|
||
|
return b.String()
|
||
|
}
|
||
|
|
||
|
// createAttributes creates a slice of Cortex Label with OTLP attributes and pairs of string values.
|
||
|
// Unpaired string value is ignored. String pairs overwrites OTLP labels if collision happens, and the overwrite is
|
||
|
// logged. Resultant label names are sanitized.
|
||
|
func createAttributes(resource pcommon.Resource, attributes pcommon.Map, externalLabels map[string]string, extras ...string) []prompb.Label {
|
||
|
// map ensures no duplicate label name
|
||
|
l := map[string]prompb.Label{}
|
||
|
|
||
|
// Ensure attributes are sorted by key for consistent merging of keys which
|
||
|
// collide when sanitized.
|
||
|
labels := make([]prompb.Label, 0, attributes.Len())
|
||
|
attributes.Range(func(key string, value pcommon.Value) bool {
|
||
|
labels = append(labels, prompb.Label{Name: key, Value: value.AsString()})
|
||
|
return true
|
||
|
})
|
||
|
sort.Stable(ByLabelName(labels))
|
||
|
|
||
|
for _, label := range labels {
|
||
|
var finalKey = prometheustranslator.NormalizeLabel(label.Name)
|
||
|
if existingLabel, alreadyExists := l[finalKey]; alreadyExists {
|
||
|
existingLabel.Value = existingLabel.Value + ";" + label.Value
|
||
|
l[finalKey] = existingLabel
|
||
|
} else {
|
||
|
l[finalKey] = prompb.Label{
|
||
|
Name: finalKey,
|
||
|
Value: label.Value,
|
||
|
}
|
||
|
}
|
||
|
}
|
||
|
|
||
|
// Map service.name + service.namespace to job
|
||
|
if serviceName, ok := resource.Attributes().Get(conventions.AttributeServiceName); ok {
|
||
|
val := serviceName.AsString()
|
||
|
if serviceNamespace, ok := resource.Attributes().Get(conventions.AttributeServiceNamespace); ok {
|
||
|
val = fmt.Sprintf("%s/%s", serviceNamespace.AsString(), val)
|
||
|
}
|
||
|
l[model.JobLabel] = prompb.Label{
|
||
|
Name: model.JobLabel,
|
||
|
Value: val,
|
||
|
}
|
||
|
}
|
||
|
// Map service.instance.id to instance
|
||
|
if instance, ok := resource.Attributes().Get(conventions.AttributeServiceInstanceID); ok {
|
||
|
l[model.InstanceLabel] = prompb.Label{
|
||
|
Name: model.InstanceLabel,
|
||
|
Value: instance.AsString(),
|
||
|
}
|
||
|
}
|
||
|
for key, value := range externalLabels {
|
||
|
// External labels have already been sanitized
|
||
|
if _, alreadyExists := l[key]; alreadyExists {
|
||
|
// Skip external labels if they are overridden by metric attributes
|
||
|
continue
|
||
|
}
|
||
|
l[key] = prompb.Label{
|
||
|
Name: key,
|
||
|
Value: value,
|
||
|
}
|
||
|
}
|
||
|
|
||
|
for i := 0; i < len(extras); i += 2 {
|
||
|
if i+1 >= len(extras) {
|
||
|
break
|
||
|
}
|
||
|
_, found := l[extras[i]]
|
||
|
if found {
|
||
|
log.Println("label " + extras[i] + " is overwritten. Check if Prometheus reserved labels are used.")
|
||
|
}
|
||
|
// internal labels should be maintained
|
||
|
name := extras[i]
|
||
|
if !(len(name) > 4 && name[:2] == "__" && name[len(name)-2:] == "__") {
|
||
|
name = prometheustranslator.NormalizeLabel(name)
|
||
|
}
|
||
|
l[name] = prompb.Label{
|
||
|
Name: name,
|
||
|
Value: extras[i+1],
|
||
|
}
|
||
|
}
|
||
|
|
||
|
s := make([]prompb.Label, 0, len(l))
|
||
|
for _, lb := range l {
|
||
|
s = append(s, lb)
|
||
|
}
|
||
|
|
||
|
return s
|
||
|
}
|
||
|
|
||
|
// isValidAggregationTemporality checks whether an OTel metric has a valid
|
||
|
// aggregation temporality for conversion to a Prometheus metric.
|
||
|
func isValidAggregationTemporality(metric pmetric.Metric) bool {
|
||
|
switch metric.Type() {
|
||
|
case pmetric.MetricTypeGauge, pmetric.MetricTypeSummary:
|
||
|
return true
|
||
|
case pmetric.MetricTypeSum:
|
||
|
return metric.Sum().AggregationTemporality() == pmetric.AggregationTemporalityCumulative
|
||
|
case pmetric.MetricTypeHistogram:
|
||
|
return metric.Histogram().AggregationTemporality() == pmetric.AggregationTemporalityCumulative
|
||
|
case pmetric.MetricTypeExponentialHistogram:
|
||
|
return metric.ExponentialHistogram().AggregationTemporality() == pmetric.AggregationTemporalityCumulative
|
||
|
}
|
||
|
return false
|
||
|
}
|
||
|
|
||
|
// addSingleHistogramDataPoint converts pt to 2 + min(len(ExplicitBounds), len(BucketCount)) + 1 samples. It
|
||
|
// ignore extra buckets if len(ExplicitBounds) > len(BucketCounts)
|
||
|
func addSingleHistogramDataPoint(pt pmetric.HistogramDataPoint, resource pcommon.Resource, metric pmetric.Metric, settings Settings, tsMap map[string]*prompb.TimeSeries) {
|
||
|
timestamp := convertTimeStamp(pt.Timestamp())
|
||
|
// sum, count, and buckets of the histogram should append suffix to baseName
|
||
|
baseName := prometheustranslator.BuildPromCompliantName(metric, settings.Namespace)
|
||
|
|
||
|
// If the sum is unset, it indicates the _sum metric point should be
|
||
|
// omitted
|
||
|
if pt.HasSum() {
|
||
|
// treat sum as a sample in an individual TimeSeries
|
||
|
sum := &prompb.Sample{
|
||
|
Value: pt.Sum(),
|
||
|
Timestamp: timestamp,
|
||
|
}
|
||
|
if pt.Flags().NoRecordedValue() {
|
||
|
sum.Value = math.Float64frombits(value.StaleNaN)
|
||
|
}
|
||
|
|
||
|
sumlabels := createAttributes(resource, pt.Attributes(), settings.ExternalLabels, nameStr, baseName+sumStr)
|
||
|
addSample(tsMap, sum, sumlabels, metric.Type().String())
|
||
|
|
||
|
}
|
||
|
|
||
|
// treat count as a sample in an individual TimeSeries
|
||
|
count := &prompb.Sample{
|
||
|
Value: float64(pt.Count()),
|
||
|
Timestamp: timestamp,
|
||
|
}
|
||
|
if pt.Flags().NoRecordedValue() {
|
||
|
count.Value = math.Float64frombits(value.StaleNaN)
|
||
|
}
|
||
|
|
||
|
countlabels := createAttributes(resource, pt.Attributes(), settings.ExternalLabels, nameStr, baseName+countStr)
|
||
|
addSample(tsMap, count, countlabels, metric.Type().String())
|
||
|
|
||
|
// cumulative count for conversion to cumulative histogram
|
||
|
var cumulativeCount uint64
|
||
|
|
||
|
promExemplars := getPromExemplars[pmetric.HistogramDataPoint](pt)
|
||
|
|
||
|
var bucketBounds []bucketBoundsData
|
||
|
|
||
|
// process each bound, based on histograms proto definition, # of buckets = # of explicit bounds + 1
|
||
|
for i := 0; i < pt.ExplicitBounds().Len() && i < pt.BucketCounts().Len(); i++ {
|
||
|
bound := pt.ExplicitBounds().At(i)
|
||
|
cumulativeCount += pt.BucketCounts().At(i)
|
||
|
bucket := &prompb.Sample{
|
||
|
Value: float64(cumulativeCount),
|
||
|
Timestamp: timestamp,
|
||
|
}
|
||
|
if pt.Flags().NoRecordedValue() {
|
||
|
bucket.Value = math.Float64frombits(value.StaleNaN)
|
||
|
}
|
||
|
boundStr := strconv.FormatFloat(bound, 'f', -1, 64)
|
||
|
labels := createAttributes(resource, pt.Attributes(), settings.ExternalLabels, nameStr, baseName+bucketStr, leStr, boundStr)
|
||
|
sig := addSample(tsMap, bucket, labels, metric.Type().String())
|
||
|
|
||
|
bucketBounds = append(bucketBounds, bucketBoundsData{sig: sig, bound: bound})
|
||
|
}
|
||
|
// add le=+Inf bucket
|
||
|
infBucket := &prompb.Sample{
|
||
|
Timestamp: timestamp,
|
||
|
}
|
||
|
if pt.Flags().NoRecordedValue() {
|
||
|
infBucket.Value = math.Float64frombits(value.StaleNaN)
|
||
|
} else {
|
||
|
infBucket.Value = float64(pt.Count())
|
||
|
}
|
||
|
infLabels := createAttributes(resource, pt.Attributes(), settings.ExternalLabels, nameStr, baseName+bucketStr, leStr, pInfStr)
|
||
|
sig := addSample(tsMap, infBucket, infLabels, metric.Type().String())
|
||
|
|
||
|
bucketBounds = append(bucketBounds, bucketBoundsData{sig: sig, bound: math.Inf(1)})
|
||
|
addExemplars(tsMap, promExemplars, bucketBounds)
|
||
|
|
||
|
// add _created time series if needed
|
||
|
startTimestamp := pt.StartTimestamp()
|
||
|
if settings.ExportCreatedMetric && startTimestamp != 0 {
|
||
|
createdLabels := createAttributes(
|
||
|
resource,
|
||
|
pt.Attributes(),
|
||
|
settings.ExternalLabels,
|
||
|
nameStr,
|
||
|
baseName+createdSuffix,
|
||
|
)
|
||
|
addCreatedTimeSeriesIfNeeded(tsMap, createdLabels, startTimestamp, metric.Type().String())
|
||
|
}
|
||
|
}
|
||
|
|
||
|
type exemplarType interface {
|
||
|
pmetric.ExponentialHistogramDataPoint | pmetric.HistogramDataPoint | pmetric.NumberDataPoint
|
||
|
Exemplars() pmetric.ExemplarSlice
|
||
|
}
|
||
|
|
||
|
func getPromExemplars[T exemplarType](pt T) []prompb.Exemplar {
|
||
|
var promExemplars []prompb.Exemplar
|
||
|
|
||
|
for i := 0; i < pt.Exemplars().Len(); i++ {
|
||
|
exemplar := pt.Exemplars().At(i)
|
||
|
exemplarRunes := 0
|
||
|
|
||
|
promExemplar := &prompb.Exemplar{
|
||
|
Value: exemplar.DoubleValue(),
|
||
|
Timestamp: timestamp.FromTime(exemplar.Timestamp().AsTime()),
|
||
|
}
|
||
|
if traceID := exemplar.TraceID(); !traceID.IsEmpty() {
|
||
|
val := hex.EncodeToString(traceID[:])
|
||
|
exemplarRunes += utf8.RuneCountInString(traceIDKey) + utf8.RuneCountInString(val)
|
||
|
promLabel := prompb.Label{
|
||
|
Name: traceIDKey,
|
||
|
Value: val,
|
||
|
}
|
||
|
promExemplar.Labels = append(promExemplar.Labels, promLabel)
|
||
|
}
|
||
|
if spanID := exemplar.SpanID(); !spanID.IsEmpty() {
|
||
|
val := hex.EncodeToString(spanID[:])
|
||
|
exemplarRunes += utf8.RuneCountInString(spanIDKey) + utf8.RuneCountInString(val)
|
||
|
promLabel := prompb.Label{
|
||
|
Name: spanIDKey,
|
||
|
Value: val,
|
||
|
}
|
||
|
promExemplar.Labels = append(promExemplar.Labels, promLabel)
|
||
|
}
|
||
|
var labelsFromAttributes []prompb.Label
|
||
|
|
||
|
exemplar.FilteredAttributes().Range(func(key string, value pcommon.Value) bool {
|
||
|
val := value.AsString()
|
||
|
exemplarRunes += utf8.RuneCountInString(key) + utf8.RuneCountInString(val)
|
||
|
promLabel := prompb.Label{
|
||
|
Name: key,
|
||
|
Value: val,
|
||
|
}
|
||
|
|
||
|
labelsFromAttributes = append(labelsFromAttributes, promLabel)
|
||
|
|
||
|
return true
|
||
|
})
|
||
|
if exemplarRunes <= maxExemplarRunes {
|
||
|
// only append filtered attributes if it does not cause exemplar
|
||
|
// labels to exceed the max number of runes
|
||
|
promExemplar.Labels = append(promExemplar.Labels, labelsFromAttributes...)
|
||
|
}
|
||
|
|
||
|
promExemplars = append(promExemplars, *promExemplar)
|
||
|
}
|
||
|
|
||
|
return promExemplars
|
||
|
}
|
||
|
|
||
|
// mostRecentTimestampInMetric returns the latest timestamp in a batch of metrics
|
||
|
func mostRecentTimestampInMetric(metric pmetric.Metric) pcommon.Timestamp {
|
||
|
var ts pcommon.Timestamp
|
||
|
// handle individual metric based on type
|
||
|
switch metric.Type() {
|
||
|
case pmetric.MetricTypeGauge:
|
||
|
dataPoints := metric.Gauge().DataPoints()
|
||
|
for x := 0; x < dataPoints.Len(); x++ {
|
||
|
ts = maxTimestamp(ts, dataPoints.At(x).Timestamp())
|
||
|
}
|
||
|
case pmetric.MetricTypeSum:
|
||
|
dataPoints := metric.Sum().DataPoints()
|
||
|
for x := 0; x < dataPoints.Len(); x++ {
|
||
|
ts = maxTimestamp(ts, dataPoints.At(x).Timestamp())
|
||
|
}
|
||
|
case pmetric.MetricTypeHistogram:
|
||
|
dataPoints := metric.Histogram().DataPoints()
|
||
|
for x := 0; x < dataPoints.Len(); x++ {
|
||
|
ts = maxTimestamp(ts, dataPoints.At(x).Timestamp())
|
||
|
}
|
||
|
case pmetric.MetricTypeExponentialHistogram:
|
||
|
dataPoints := metric.ExponentialHistogram().DataPoints()
|
||
|
for x := 0; x < dataPoints.Len(); x++ {
|
||
|
ts = maxTimestamp(ts, dataPoints.At(x).Timestamp())
|
||
|
}
|
||
|
case pmetric.MetricTypeSummary:
|
||
|
dataPoints := metric.Summary().DataPoints()
|
||
|
for x := 0; x < dataPoints.Len(); x++ {
|
||
|
ts = maxTimestamp(ts, dataPoints.At(x).Timestamp())
|
||
|
}
|
||
|
}
|
||
|
return ts
|
||
|
}
|
||
|
|
||
|
func maxTimestamp(a, b pcommon.Timestamp) pcommon.Timestamp {
|
||
|
if a > b {
|
||
|
return a
|
||
|
}
|
||
|
return b
|
||
|
}
|
||
|
|
||
|
// addSingleSummaryDataPoint converts pt to len(QuantileValues) + 2 samples.
|
||
|
func addSingleSummaryDataPoint(pt pmetric.SummaryDataPoint, resource pcommon.Resource, metric pmetric.Metric, settings Settings,
|
||
|
tsMap map[string]*prompb.TimeSeries) {
|
||
|
timestamp := convertTimeStamp(pt.Timestamp())
|
||
|
// sum and count of the summary should append suffix to baseName
|
||
|
baseName := prometheustranslator.BuildPromCompliantName(metric, settings.Namespace)
|
||
|
// treat sum as a sample in an individual TimeSeries
|
||
|
sum := &prompb.Sample{
|
||
|
Value: pt.Sum(),
|
||
|
Timestamp: timestamp,
|
||
|
}
|
||
|
if pt.Flags().NoRecordedValue() {
|
||
|
sum.Value = math.Float64frombits(value.StaleNaN)
|
||
|
}
|
||
|
sumlabels := createAttributes(resource, pt.Attributes(), settings.ExternalLabels, nameStr, baseName+sumStr)
|
||
|
addSample(tsMap, sum, sumlabels, metric.Type().String())
|
||
|
|
||
|
// treat count as a sample in an individual TimeSeries
|
||
|
count := &prompb.Sample{
|
||
|
Value: float64(pt.Count()),
|
||
|
Timestamp: timestamp,
|
||
|
}
|
||
|
if pt.Flags().NoRecordedValue() {
|
||
|
count.Value = math.Float64frombits(value.StaleNaN)
|
||
|
}
|
||
|
countlabels := createAttributes(resource, pt.Attributes(), settings.ExternalLabels, nameStr, baseName+countStr)
|
||
|
addSample(tsMap, count, countlabels, metric.Type().String())
|
||
|
|
||
|
// process each percentile/quantile
|
||
|
for i := 0; i < pt.QuantileValues().Len(); i++ {
|
||
|
qt := pt.QuantileValues().At(i)
|
||
|
quantile := &prompb.Sample{
|
||
|
Value: qt.Value(),
|
||
|
Timestamp: timestamp,
|
||
|
}
|
||
|
if pt.Flags().NoRecordedValue() {
|
||
|
quantile.Value = math.Float64frombits(value.StaleNaN)
|
||
|
}
|
||
|
percentileStr := strconv.FormatFloat(qt.Quantile(), 'f', -1, 64)
|
||
|
qtlabels := createAttributes(resource, pt.Attributes(), settings.ExternalLabels, nameStr, baseName, quantileStr, percentileStr)
|
||
|
addSample(tsMap, quantile, qtlabels, metric.Type().String())
|
||
|
}
|
||
|
|
||
|
// add _created time series if needed
|
||
|
startTimestamp := pt.StartTimestamp()
|
||
|
if settings.ExportCreatedMetric && startTimestamp != 0 {
|
||
|
createdLabels := createAttributes(
|
||
|
resource,
|
||
|
pt.Attributes(),
|
||
|
settings.ExternalLabels,
|
||
|
nameStr,
|
||
|
baseName+createdSuffix,
|
||
|
)
|
||
|
addCreatedTimeSeriesIfNeeded(tsMap, createdLabels, startTimestamp, metric.Type().String())
|
||
|
}
|
||
|
}
|
||
|
|
||
|
// addCreatedTimeSeriesIfNeeded adds {name}_created time series with a single
|
||
|
// sample. If the series exists, then new samples won't be added.
|
||
|
func addCreatedTimeSeriesIfNeeded(
|
||
|
series map[string]*prompb.TimeSeries,
|
||
|
labels []prompb.Label,
|
||
|
startTimestamp pcommon.Timestamp,
|
||
|
metricType string,
|
||
|
) {
|
||
|
sig := timeSeriesSignature(metricType, &labels)
|
||
|
if _, ok := series[sig]; !ok {
|
||
|
series[sig] = &prompb.TimeSeries{
|
||
|
Labels: labels,
|
||
|
Samples: []prompb.Sample{
|
||
|
{ // convert ns to ms
|
||
|
Value: float64(convertTimeStamp(startTimestamp)),
|
||
|
},
|
||
|
},
|
||
|
}
|
||
|
}
|
||
|
}
|
||
|
|
||
|
// addResourceTargetInfo converts the resource to the target info metric
|
||
|
func addResourceTargetInfo(resource pcommon.Resource, settings Settings, timestamp pcommon.Timestamp, tsMap map[string]*prompb.TimeSeries) {
|
||
|
if settings.DisableTargetInfo {
|
||
|
return
|
||
|
}
|
||
|
// Use resource attributes (other than those used for job+instance) as the
|
||
|
// metric labels for the target info metric
|
||
|
attributes := pcommon.NewMap()
|
||
|
resource.Attributes().CopyTo(attributes)
|
||
|
attributes.RemoveIf(func(k string, _ pcommon.Value) bool {
|
||
|
switch k {
|
||
|
case conventions.AttributeServiceName, conventions.AttributeServiceNamespace, conventions.AttributeServiceInstanceID:
|
||
|
// Remove resource attributes used for job + instance
|
||
|
return true
|
||
|
default:
|
||
|
return false
|
||
|
}
|
||
|
})
|
||
|
if attributes.Len() == 0 {
|
||
|
// If we only have job + instance, then target_info isn't useful, so don't add it.
|
||
|
return
|
||
|
}
|
||
|
// create parameters for addSample
|
||
|
name := targetMetricName
|
||
|
if len(settings.Namespace) > 0 {
|
||
|
name = settings.Namespace + "_" + name
|
||
|
}
|
||
|
labels := createAttributes(resource, attributes, settings.ExternalLabels, nameStr, name)
|
||
|
sample := &prompb.Sample{
|
||
|
Value: float64(1),
|
||
|
// convert ns to ms
|
||
|
Timestamp: convertTimeStamp(timestamp),
|
||
|
}
|
||
|
addSample(tsMap, sample, labels, infoType)
|
||
|
}
|
||
|
|
||
|
// convertTimeStamp converts OTLP timestamp in ns to timestamp in ms
|
||
|
func convertTimeStamp(timestamp pcommon.Timestamp) int64 {
|
||
|
return timestamp.AsTime().UnixNano() / (int64(time.Millisecond) / int64(time.Nanosecond))
|
||
|
}
|