From abd99095957588cd309d811dce9977a73e806756 Mon Sep 17 00:00:00 2001 From: Bryan Boreham Date: Wed, 9 Mar 2022 22:26:43 +0000 Subject: [PATCH] Update package storage/remote for new labels.Labels type `QueueManager.externalLabels` becomes a slice rather than a `Labels` so we can index into it when doing the merge operation. Note we avoid calling `Labels.Len()` in `labelProtosToLabels()`. It isn't necessary - `append()` will enlarge the buffer and we're expecting to re-use it many times. Also, we now validate protobuf input before converting to Labels. This way we can detect errors first, and we don't place unnecessary requirements on the Labels structure. Re-do seriesFilter using labels.Builder (albeit N^2). Signed-off-by: Bryan Boreham --- storage/remote/codec.go | 28 +++++++---------- storage/remote/queue_manager.go | 55 +++++++++++++++------------------ storage/remote/read.go | 37 +++++++++++----------- 3 files changed, 54 insertions(+), 66 deletions(-) diff --git a/storage/remote/codec.go b/storage/remote/codec.go index a74ad2b7b..431618c50 100644 --- a/storage/remote/codec.go +++ b/storage/remote/codec.go @@ -153,10 +153,10 @@ func ToQueryResult(ss storage.SeriesSet, sampleLimit int) (*prompb.QueryResult, func FromQueryResult(sortSeries bool, res *prompb.QueryResult) storage.SeriesSet { series := make([]storage.Series, 0, len(res.Timeseries)) for _, ts := range res.Timeseries { - lbls := labelProtosToLabels(ts.Labels) - if err := validateLabelsAndMetricName(lbls); err != nil { + if err := validateLabelsAndMetricName(ts.Labels); err != nil { return errSeriesSet{err: err} } + lbls := labelProtosToLabels(ts.Labels) series = append(series, &concreteSeries{labels: lbls, samples: ts.Samples}) } @@ -346,7 +346,7 @@ type concreteSeries struct { } func (c *concreteSeries) Labels() labels.Labels { - return labels.New(c.labels...) + return c.labels.Copy() } func (c *concreteSeries) Iterator(it chunkenc.Iterator) chunkenc.Iterator { @@ -441,7 +441,7 @@ func (c *concreteSeriesIterator) Err() error { // validateLabelsAndMetricName validates the label names/values and metric names returned from remote read, // also making sure that there are no labels with duplicate names -func validateLabelsAndMetricName(ls labels.Labels) error { +func validateLabelsAndMetricName(ls []prompb.Label) error { for i, l := range ls { if l.Name == labels.MetricName && !model.IsValidMetricName(model.LabelValue(l.Value)) { return fmt.Errorf("invalid metric name: %v", l.Value) @@ -581,30 +581,24 @@ func LabelProtosToMetric(labelPairs []*prompb.Label) model.Metric { } func labelProtosToLabels(labelPairs []prompb.Label) labels.Labels { - result := make(labels.Labels, 0, len(labelPairs)) + b := labels.ScratchBuilder{} for _, l := range labelPairs { - result = append(result, labels.Label{ - Name: l.Name, - Value: l.Value, - }) + b.Add(l.Name, l.Value) } - sort.Sort(result) - return result + b.Sort() + return b.Labels() } // labelsToLabelsProto transforms labels into prompb labels. The buffer slice // will be used to avoid allocations if it is big enough to store the labels. -func labelsToLabelsProto(labels labels.Labels, buf []prompb.Label) []prompb.Label { +func labelsToLabelsProto(lbls labels.Labels, buf []prompb.Label) []prompb.Label { result := buf[:0] - if cap(buf) < len(labels) { - result = make([]prompb.Label, 0, len(labels)) - } - for _, l := range labels { + lbls.Range(func(l labels.Label) { result = append(result, prompb.Label{ Name: l.Name, Value: l.Value, }) - } + }) return result } diff --git a/storage/remote/queue_manager.go b/storage/remote/queue_manager.go index e701cb94b..f907615fc 100644 --- a/storage/remote/queue_manager.go +++ b/storage/remote/queue_manager.go @@ -396,7 +396,7 @@ type QueueManager struct { flushDeadline time.Duration cfg config.QueueConfig mcfg config.MetadataConfig - externalLabels labels.Labels + externalLabels []labels.Label relabelConfigs []*relabel.Config sendExemplars bool sendNativeHistograms bool @@ -454,13 +454,19 @@ func NewQueueManager( logger = log.NewNopLogger() } + // Copy externalLabels into slice which we need for processExternalLabels. + extLabelsSlice := make([]labels.Label, 0, externalLabels.Len()) + externalLabels.Range(func(l labels.Label) { + extLabelsSlice = append(extLabelsSlice, l) + }) + logger = log.With(logger, remoteName, client.Name(), endpoint, client.Endpoint()) t := &QueueManager{ logger: logger, flushDeadline: flushDeadline, cfg: cfg, mcfg: mCfg, - externalLabels: externalLabels, + externalLabels: extLabelsSlice, relabelConfigs: relabelConfigs, storeClient: client, sendExemplars: enableExemplarRemoteWrite, @@ -769,8 +775,8 @@ func (t *QueueManager) StoreSeries(series []record.RefSeries, index int) { t.seriesSegmentIndexes[s.Ref] = index ls := processExternalLabels(s.Labels, t.externalLabels) - lbls := relabel.Process(ls, t.relabelConfigs...) - if len(lbls) == 0 { + lbls, keep := relabel.Process(ls, t.relabelConfigs...) + if !keep || lbls.IsEmpty() { t.droppedSeries[s.Ref] = struct{}{} continue } @@ -831,44 +837,33 @@ func (t *QueueManager) client() WriteClient { } func (t *QueueManager) internLabels(lbls labels.Labels) { - for i, l := range lbls { - lbls[i].Name = t.interner.intern(l.Name) - lbls[i].Value = t.interner.intern(l.Value) - } + lbls.InternStrings(t.interner.intern) } func (t *QueueManager) releaseLabels(ls labels.Labels) { - for _, l := range ls { - t.interner.release(l.Name) - t.interner.release(l.Value) - } + ls.ReleaseStrings(t.interner.release) } // processExternalLabels merges externalLabels into ls. If ls contains // a label in externalLabels, the value in ls wins. -func processExternalLabels(ls, externalLabels labels.Labels) labels.Labels { - i, j, result := 0, 0, make(labels.Labels, 0, len(ls)+len(externalLabels)) - for i < len(ls) && j < len(externalLabels) { - if ls[i].Name < externalLabels[j].Name { - result = append(result, labels.Label{ - Name: ls[i].Name, - Value: ls[i].Value, - }) - i++ - } else if ls[i].Name > externalLabels[j].Name { - result = append(result, externalLabels[j]) +func processExternalLabels(ls labels.Labels, externalLabels []labels.Label) labels.Labels { + b := labels.NewScratchBuilder(ls.Len() + len(externalLabels)) + j := 0 + ls.Range(func(l labels.Label) { + for j < len(externalLabels) && l.Name > externalLabels[j].Name { + b.Add(externalLabels[j].Name, externalLabels[j].Value) j++ - } else { - result = append(result, labels.Label{ - Name: ls[i].Name, - Value: ls[i].Value, - }) - i++ + } + if j < len(externalLabels) && l.Name == externalLabels[j].Name { j++ } + b.Add(l.Name, l.Value) + }) + for ; j < len(externalLabels); j++ { + b.Add(externalLabels[j].Name, externalLabels[j].Value) } - return append(append(result, ls[i:]...), externalLabels[j:]...) + return b.Labels() } func (t *QueueManager) updateShardsLoop() { diff --git a/storage/remote/read.go b/storage/remote/read.go index 154eb73f9..21524d70d 100644 --- a/storage/remote/read.go +++ b/storage/remote/read.go @@ -180,9 +180,11 @@ func (q *querier) Select(sortSeries bool, hints *storage.SelectHints, matchers . // We return the new set of matchers, along with a map of labels for which // matchers were added, so that these can later be removed from the result // time series again. -func (q querier) addExternalLabels(ms []*labels.Matcher) ([]*labels.Matcher, labels.Labels) { - el := make(labels.Labels, len(q.externalLabels)) - copy(el, q.externalLabels) +func (q querier) addExternalLabels(ms []*labels.Matcher) ([]*labels.Matcher, []string) { + el := make([]labels.Label, 0, q.externalLabels.Len()) + q.externalLabels.Range(func(l labels.Label) { + el = append(el, l) + }) // ms won't be sorted, so have to O(n^2) the search. for _, m := range ms { @@ -202,7 +204,11 @@ func (q querier) addExternalLabels(ms []*labels.Matcher) ([]*labels.Matcher, lab } ms = append(ms, m) } - return ms, el + names := make([]string, len(el)) + for i := range el { + names[i] = el[i].Name + } + return ms, names } // LabelValues implements storage.Querier and is a noop. @@ -234,7 +240,8 @@ func (q *chunkQuerier) Select(sortSeries bool, hints *storage.SelectHints, match return storage.NewSeriesSetToChunkSet(q.querier.Select(sortSeries, hints, matchers...)) } -func newSeriesSetFilter(ss storage.SeriesSet, toFilter labels.Labels) storage.SeriesSet { +// Note strings in toFilter must be sorted. +func newSeriesSetFilter(ss storage.SeriesSet, toFilter []string) storage.SeriesSet { return &seriesSetFilter{ SeriesSet: ss, toFilter: toFilter, @@ -243,7 +250,7 @@ func newSeriesSetFilter(ss storage.SeriesSet, toFilter labels.Labels) storage.Se type seriesSetFilter struct { storage.SeriesSet - toFilter labels.Labels + toFilter []string // Label names to remove from result querier storage.Querier } @@ -264,20 +271,12 @@ func (ssf seriesSetFilter) At() storage.Series { type seriesFilter struct { storage.Series - toFilter labels.Labels + toFilter []string // Label names to remove from result } func (sf seriesFilter) Labels() labels.Labels { - labels := sf.Series.Labels() - for i, j := 0, 0; i < len(labels) && j < len(sf.toFilter); { - if labels[i].Name < sf.toFilter[j].Name { - i++ - } else if labels[i].Name > sf.toFilter[j].Name { - j++ - } else { - labels = labels[:i+copy(labels[i:], labels[i+1:])] - j++ - } - } - return labels + b := labels.NewBuilder(sf.Series.Labels()) + // todo: check if this is too inefficient. + b.Del(sf.toFilter...) + return b.Labels(labels.EmptyLabels()) }