Merge pull request #14316 from pracucci/export-labelsToLabelsProto

Export remote.LabelsToLabelsProto() and remote.LabelProtosToLabels()
pull/14304/head
George Krajcsovits 5 months ago committed by GitHub
commit c25d6d8ac6
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194

@ -166,7 +166,7 @@ func ToQueryResult(ss storage.SeriesSet, sampleLimit int) (*prompb.QueryResult,
} }
resp.Timeseries = append(resp.Timeseries, &prompb.TimeSeries{ resp.Timeseries = append(resp.Timeseries, &prompb.TimeSeries{
Labels: labelsToLabelsProto(series.Labels(), nil), Labels: LabelsToLabelsProto(series.Labels(), nil),
Samples: samples, Samples: samples,
Histograms: histograms, Histograms: histograms,
}) })
@ -182,7 +182,7 @@ func FromQueryResult(sortSeries bool, res *prompb.QueryResult) storage.SeriesSet
if err := validateLabelsAndMetricName(ts.Labels); err != nil { if err := validateLabelsAndMetricName(ts.Labels); err != nil {
return errSeriesSet{err: err} return errSeriesSet{err: err}
} }
lbls := labelProtosToLabels(&b, ts.Labels) lbls := LabelProtosToLabels(&b, ts.Labels)
series = append(series, &concreteSeries{labels: lbls, floats: ts.Samples, histograms: ts.Histograms}) series = append(series, &concreteSeries{labels: lbls, floats: ts.Samples, histograms: ts.Histograms})
} }
@ -235,7 +235,7 @@ func StreamChunkedReadResponses(
for ss.Next() { for ss.Next() {
series := ss.At() series := ss.At()
iter = series.Iterator(iter) iter = series.Iterator(iter)
lbls = MergeLabels(labelsToLabelsProto(series.Labels(), lbls), sortedExternalLabels) lbls = MergeLabels(LabelsToLabelsProto(series.Labels(), lbls), sortedExternalLabels)
maxDataLength := maxBytesInFrame maxDataLength := maxBytesInFrame
for _, lbl := range lbls { for _, lbl := range lbls {
@ -622,7 +622,7 @@ func exemplarProtoToExemplar(b *labels.ScratchBuilder, ep prompb.Exemplar) exemp
timestamp := ep.Timestamp timestamp := ep.Timestamp
return exemplar.Exemplar{ return exemplar.Exemplar{
Labels: labelProtosToLabels(b, ep.Labels), Labels: LabelProtosToLabels(b, ep.Labels),
Value: ep.Value, Value: ep.Value,
Ts: timestamp, Ts: timestamp,
HasTs: timestamp != 0, HasTs: timestamp != 0,
@ -762,7 +762,9 @@ func LabelProtosToMetric(labelPairs []*prompb.Label) model.Metric {
return metric return metric
} }
func labelProtosToLabels(b *labels.ScratchBuilder, labelPairs []prompb.Label) labels.Labels { // LabelProtosToLabels transforms prompb labels into labels. The labels builder
// will be used to build the returned labels.
func LabelProtosToLabels(b *labels.ScratchBuilder, labelPairs []prompb.Label) labels.Labels {
b.Reset() b.Reset()
for _, l := range labelPairs { for _, l := range labelPairs {
b.Add(l.Name, l.Value) b.Add(l.Name, l.Value)
@ -771,9 +773,9 @@ func labelProtosToLabels(b *labels.ScratchBuilder, labelPairs []prompb.Label) la
return b.Labels() return b.Labels()
} }
// labelsToLabelsProto transforms labels into prompb labels. The buffer slice // LabelsToLabelsProto transforms labels into prompb labels. The buffer slice
// will be used to avoid allocations if it is big enough to store the labels. // will be used to avoid allocations if it is big enough to store the labels.
func labelsToLabelsProto(lbls labels.Labels, buf []prompb.Label) []prompb.Label { func LabelsToLabelsProto(lbls labels.Labels, buf []prompb.Label) []prompb.Label {
result := buf[:0] result := buf[:0]
lbls.Range(func(l labels.Label) { lbls.Range(func(l labels.Label) {
result = append(result, prompb.Label{ result = append(result, prompb.Label{

@ -729,8 +729,8 @@ func TestFloatHistogramToProtoConvert(t *testing.T) {
} }
func TestStreamResponse(t *testing.T) { func TestStreamResponse(t *testing.T) {
lbs1 := labelsToLabelsProto(labels.FromStrings("instance", "localhost1", "job", "demo1"), nil) lbs1 := LabelsToLabelsProto(labels.FromStrings("instance", "localhost1", "job", "demo1"), nil)
lbs2 := labelsToLabelsProto(labels.FromStrings("instance", "localhost2", "job", "demo2"), nil) lbs2 := LabelsToLabelsProto(labels.FromStrings("instance", "localhost2", "job", "demo2"), nil)
chunk := prompb.Chunk{ chunk := prompb.Chunk{
Type: prompb.Chunk_XOR, Type: prompb.Chunk_XOR,
Data: make([]byte, 100), Data: make([]byte, 100),
@ -802,7 +802,7 @@ func (c *mockChunkSeriesSet) Next() bool {
func (c *mockChunkSeriesSet) At() storage.ChunkSeries { func (c *mockChunkSeriesSet) At() storage.ChunkSeries {
return &storage.ChunkSeriesEntry{ return &storage.ChunkSeriesEntry{
Lset: labelProtosToLabels(&c.builder, c.chunkedSeries[c.index].Labels), Lset: LabelProtosToLabels(&c.builder, c.chunkedSeries[c.index].Labels),
ChunkIteratorFn: func(chunks.Iterator) chunks.Iterator { ChunkIteratorFn: func(chunks.Iterator) chunks.Iterator {
return &mockChunkIterator{ return &mockChunkIterator{
chunks: c.chunkedSeries[c.index].Chunks, chunks: c.chunkedSeries[c.index].Chunks,

@ -1507,7 +1507,7 @@ func (s *shards) populateTimeSeries(batch []timeSeries, pendingData []prompb.Tim
// Number of pending samples is limited by the fact that sendSamples (via sendSamplesWithBackoff) // Number of pending samples is limited by the fact that sendSamples (via sendSamplesWithBackoff)
// retries endlessly, so once we reach max samples, if we can never send to the endpoint we'll // retries endlessly, so once we reach max samples, if we can never send to the endpoint we'll
// stop reading from the queue. This makes it safe to reference pendingSamples by index. // stop reading from the queue. This makes it safe to reference pendingSamples by index.
pendingData[nPending].Labels = labelsToLabelsProto(d.seriesLabels, pendingData[nPending].Labels) pendingData[nPending].Labels = LabelsToLabelsProto(d.seriesLabels, pendingData[nPending].Labels)
switch d.sType { switch d.sType {
case tSample: case tSample:
pendingData[nPending].Samples = append(pendingData[nPending].Samples, prompb.Sample{ pendingData[nPending].Samples = append(pendingData[nPending].Samples, prompb.Sample{
@ -1517,7 +1517,7 @@ func (s *shards) populateTimeSeries(batch []timeSeries, pendingData []prompb.Tim
nPendingSamples++ nPendingSamples++
case tExemplar: case tExemplar:
pendingData[nPending].Exemplars = append(pendingData[nPending].Exemplars, prompb.Exemplar{ pendingData[nPending].Exemplars = append(pendingData[nPending].Exemplars, prompb.Exemplar{
Labels: labelsToLabelsProto(d.exemplarLabels, nil), Labels: LabelsToLabelsProto(d.exemplarLabels, nil),
Value: d.value, Value: d.value,
Timestamp: d.timestamp, Timestamp: d.timestamp,
}) })

@ -742,7 +742,7 @@ func (c *TestWriteClient) expectExemplars(ss []record.RefExemplar, series []reco
for _, s := range ss { for _, s := range ss {
seriesName := getSeriesNameFromRef(series[s.Ref]) seriesName := getSeriesNameFromRef(series[s.Ref])
e := prompb.Exemplar{ e := prompb.Exemplar{
Labels: labelsToLabelsProto(s.Labels, nil), Labels: LabelsToLabelsProto(s.Labels, nil),
Timestamp: s.T, Timestamp: s.T,
Value: s.V, Value: s.V,
} }
@ -826,7 +826,7 @@ func (c *TestWriteClient) Store(_ context.Context, req []byte, _ int) error {
builder := labels.NewScratchBuilder(0) builder := labels.NewScratchBuilder(0)
count := 0 count := 0
for _, ts := range reqProto.Timeseries { for _, ts := range reqProto.Timeseries {
labels := labelProtosToLabels(&builder, ts.Labels) labels := LabelProtosToLabels(&builder, ts.Labels)
seriesName := labels.Get("__name__") seriesName := labels.Get("__name__")
for _, sample := range ts.Samples { for _, sample := range ts.Samples {
count++ count++

@ -172,12 +172,12 @@ func TestSeriesSetFilter(t *testing.T) {
toRemove: []string{"foo"}, toRemove: []string{"foo"},
in: &prompb.QueryResult{ in: &prompb.QueryResult{
Timeseries: []*prompb.TimeSeries{ Timeseries: []*prompb.TimeSeries{
{Labels: labelsToLabelsProto(labels.FromStrings("foo", "bar", "a", "b"), nil)}, {Labels: LabelsToLabelsProto(labels.FromStrings("foo", "bar", "a", "b"), nil)},
}, },
}, },
expected: &prompb.QueryResult{ expected: &prompb.QueryResult{
Timeseries: []*prompb.TimeSeries{ Timeseries: []*prompb.TimeSeries{
{Labels: labelsToLabelsProto(labels.FromStrings("a", "b"), nil)}, {Labels: LabelsToLabelsProto(labels.FromStrings("a", "b"), nil)},
}, },
}, },
}, },
@ -211,7 +211,7 @@ func (c *mockedRemoteClient) Read(_ context.Context, query *prompb.Query) (*prom
q := &prompb.QueryResult{} q := &prompb.QueryResult{}
for _, s := range c.store { for _, s := range c.store {
l := labelProtosToLabels(&c.b, s.Labels) l := LabelProtosToLabels(&c.b, s.Labels)
var notMatch bool var notMatch bool
for _, m := range matchers { for _, m := range matchers {

@ -116,7 +116,7 @@ func (h *writeHandler) write(ctx context.Context, req *prompb.WriteRequest) (err
b := labels.NewScratchBuilder(0) b := labels.NewScratchBuilder(0)
var exemplarErr error var exemplarErr error
for _, ts := range req.Timeseries { for _, ts := range req.Timeseries {
labels := labelProtosToLabels(&b, ts.Labels) labels := LabelProtosToLabels(&b, ts.Labels)
if !labels.IsValid() { if !labels.IsValid() {
level.Warn(h.logger).Log("msg", "Invalid metric names or labels", "got", labels.String()) level.Warn(h.logger).Log("msg", "Invalid metric names or labels", "got", labels.String())
samplesWithInvalidLabels++ samplesWithInvalidLabels++

@ -60,14 +60,14 @@ func TestRemoteWriteHandler(t *testing.T) {
j := 0 j := 0
k := 0 k := 0
for _, ts := range writeRequestFixture.Timeseries { for _, ts := range writeRequestFixture.Timeseries {
labels := labelProtosToLabels(&b, ts.Labels) labels := LabelProtosToLabels(&b, ts.Labels)
for _, s := range ts.Samples { for _, s := range ts.Samples {
requireEqual(t, mockSample{labels, s.Timestamp, s.Value}, appendable.samples[i]) requireEqual(t, mockSample{labels, s.Timestamp, s.Value}, appendable.samples[i])
i++ i++
} }
for _, e := range ts.Exemplars { for _, e := range ts.Exemplars {
exemplarLabels := labelProtosToLabels(&b, e.Labels) exemplarLabels := LabelProtosToLabels(&b, e.Labels)
requireEqual(t, mockExemplar{labels, exemplarLabels, e.Timestamp, e.Value}, appendable.exemplars[j]) requireEqual(t, mockExemplar{labels, exemplarLabels, e.Timestamp, e.Value}, appendable.exemplars[j])
j++ j++
} }

Loading…
Cancel
Save