From 16f71a7723b612dae7cabd4ef7f0f5048a540b92 Mon Sep 17 00:00:00 2001 From: Tom Wilkie Date: Mon, 23 Oct 2017 14:44:57 +0100 Subject: [PATCH 01/14] Port codec.go over form 1.8 branch. --- storage/remote/client.go | 43 ++------ storage/remote/codec.go | 218 +++++++++++++++++++++++++++++++++++++++ storage/remote/read.go | 29 +----- 3 files changed, 230 insertions(+), 60 deletions(-) create mode 100644 storage/remote/codec.go diff --git a/storage/remote/client.go b/storage/remote/client.go index b2c054ac4..e5b5bf324 100644 --- a/storage/remote/client.go +++ b/storage/remote/client.go @@ -29,6 +29,7 @@ import ( "golang.org/x/net/context/ctxhttp" "github.com/prometheus/prometheus/config" + "github.com/prometheus/prometheus/pkg/labels" "github.com/prometheus/prometheus/prompb" "github.com/prometheus/prometheus/util/httputil" ) @@ -73,29 +74,7 @@ type recoverableError struct { // Store sends a batch of samples to the HTTP endpoint. func (c *Client) Store(samples model.Samples) error { - req := &prompb.WriteRequest{ - Timeseries: make([]*prompb.TimeSeries, 0, len(samples)), - } - for _, s := range samples { - ts := &prompb.TimeSeries{ - Labels: make([]*prompb.Label, 0, len(s.Metric)), - } - for k, v := range s.Metric { - ts.Labels = append(ts.Labels, - &prompb.Label{ - Name: string(k), - Value: string(v), - }) - } - ts.Samples = []*prompb.Sample{ - { - Value: float64(s.Value), - Timestamp: int64(s.Timestamp), - }, - } - req.Timeseries = append(req.Timeseries, ts) - } - + req := ToWriteRequest(samples) data, err := proto.Marshal(req) if err != nil { return err @@ -143,17 +122,17 @@ func (c Client) Name() string { } // Read reads from a remote endpoint. -func (c *Client) Read(ctx context.Context, from, through int64, matchers []*prompb.LabelMatcher) ([]*prompb.TimeSeries, error) { - req := &prompb.ReadRequest{ - // TODO: Support batching multiple queries into one read request, - // as the protobuf interface allows for it. - Queries: []*prompb.Query{{ - StartTimestampMs: from, - EndTimestampMs: through, - Matchers: matchers, - }}, +func (c *Client) Read(ctx context.Context, from, through int64, matchers []*labels.Matcher) ([]*prompb.TimeSeries, error) { + query, err := ToQuery(from, through, matchers) + if err != nil { + return nil, err } + req := &prompb.ReadRequest{ + Queries: []*prompb.Query{ + query, + }, + } data, err := proto.Marshal(req) if err != nil { return nil, fmt.Errorf("unable to marshal read request: %v", err) diff --git a/storage/remote/codec.go b/storage/remote/codec.go new file mode 100644 index 000000000..2d5b7bb55 --- /dev/null +++ b/storage/remote/codec.go @@ -0,0 +1,218 @@ +// Copyright 2016 The Prometheus Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package remote + +import ( + "fmt" + "io/ioutil" + "net/http" + + "github.com/gogo/protobuf/proto" + "github.com/golang/snappy" + "github.com/prometheus/common/model" + + "github.com/prometheus/prometheus/pkg/labels" + "github.com/prometheus/prometheus/prompb" +) + +// DecodeReadRequest reads a remote.Request from a http.Request. +func DecodeReadRequest(r *http.Request) (*prompb.ReadRequest, error) { + compressed, err := ioutil.ReadAll(r.Body) + if err != nil { + return nil, err + } + + reqBuf, err := snappy.Decode(nil, compressed) + if err != nil { + return nil, err + } + + var req prompb.ReadRequest + if err := proto.Unmarshal(reqBuf, &req); err != nil { + return nil, err + } + + return &req, nil +} + +// EncodeReadResponse writes a remote.Response to a http.ResponseWriter. +func EncodeReadResponse(resp *prompb.ReadResponse, w http.ResponseWriter) error { + data, err := proto.Marshal(resp) + if err != nil { + return err + } + + w.Header().Set("Content-Type", "application/x-protobuf") + w.Header().Set("Content-Encoding", "snappy") + + compressed := snappy.Encode(nil, data) + _, err = w.Write(compressed) + return err +} + +// ToWriteRequest converts an array of samples into a WriteRequest proto. +func ToWriteRequest(samples []*model.Sample) *prompb.WriteRequest { + req := &prompb.WriteRequest{ + Timeseries: make([]*prompb.TimeSeries, 0, len(samples)), + } + + for _, s := range samples { + ts := prompb.TimeSeries{ + Labels: ToLabelPairs(s.Metric), + Samples: []*prompb.Sample{ + { + Value: float64(s.Value), + Timestamp: int64(s.Timestamp), + }, + }, + } + req.Timeseries = append(req.Timeseries, &ts) + } + + return req +} + +// ToQuery builds a Query proto. +func ToQuery(from, to int64, matchers []*labels.Matcher) (*prompb.Query, error) { + ms, err := toLabelMatchers(matchers) + if err != nil { + return nil, err + } + + return &prompb.Query{ + StartTimestampMs: from, + EndTimestampMs: to, + Matchers: ms, + }, nil +} + +// FromQuery unpacks a Query proto. +func FromQuery(req *prompb.Query) (model.Time, model.Time, []*labels.Matcher, error) { + matchers, err := fromLabelMatchers(req.Matchers) + if err != nil { + return 0, 0, nil, err + } + from := model.Time(req.StartTimestampMs) + to := model.Time(req.EndTimestampMs) + return from, to, matchers, nil +} + +// ToQueryResult builds a QueryResult proto. +func ToQueryResult(matrix model.Matrix) *prompb.QueryResult { + resp := &prompb.QueryResult{} + for _, ss := range matrix { + ts := prompb.TimeSeries{ + Labels: ToLabelPairs(ss.Metric), + Samples: make([]*prompb.Sample, 0, len(ss.Values)), + } + for _, s := range ss.Values { + ts.Samples = append(ts.Samples, &prompb.Sample{ + Value: float64(s.Value), + Timestamp: int64(s.Timestamp), + }) + } + resp.Timeseries = append(resp.Timeseries, &ts) + } + return resp +} + +// FromQueryResult unpacks a QueryResult proto. +func FromQueryResult(resp *prompb.QueryResult) model.Matrix { + m := make(model.Matrix, 0, len(resp.Timeseries)) + for _, ts := range resp.Timeseries { + var ss model.SampleStream + ss.Metric = FromLabelPairs(ts.Labels) + ss.Values = make([]model.SamplePair, 0, len(ts.Samples)) + for _, s := range ts.Samples { + ss.Values = append(ss.Values, model.SamplePair{ + Value: model.SampleValue(s.Value), + Timestamp: model.Time(s.Timestamp), + }) + } + m = append(m, &ss) + } + + return m +} + +func toLabelMatchers(matchers []*labels.Matcher) ([]*prompb.LabelMatcher, error) { + pbMatchers := make([]*prompb.LabelMatcher, 0, len(matchers)) + for _, m := range matchers { + var mType prompb.LabelMatcher_Type + switch m.Type { + case labels.MatchEqual: + mType = prompb.LabelMatcher_EQ + case labels.MatchNotEqual: + mType = prompb.LabelMatcher_NEQ + case labels.MatchRegexp: + mType = prompb.LabelMatcher_RE + case labels.MatchNotRegexp: + mType = prompb.LabelMatcher_NRE + default: + return nil, fmt.Errorf("invalid matcher type") + } + pbMatchers = append(pbMatchers, &prompb.LabelMatcher{ + Type: mType, + Name: m.Name, + Value: m.Value, + }) + } + return pbMatchers, nil +} + +func fromLabelMatchers(matchers []*prompb.LabelMatcher) ([]*labels.Matcher, error) { + result := make([]*labels.Matcher, 0, len(matchers)) + for _, matcher := range matchers { + var mtype labels.MatchType + switch matcher.Type { + case prompb.LabelMatcher_EQ: + mtype = labels.MatchEqual + case prompb.LabelMatcher_NEQ: + mtype = labels.MatchNotEqual + case prompb.LabelMatcher_RE: + mtype = labels.MatchRegexp + case prompb.LabelMatcher_NRE: + mtype = labels.MatchNotRegexp + default: + return nil, fmt.Errorf("invalid matcher type") + } + matcher, err := labels.NewMatcher(mtype, matcher.Name, matcher.Value) + if err != nil { + return nil, err + } + result = append(result, matcher) + } + return result, nil +} + +// ToLabelPairs builds a []LabelPair from a model.Metric +func ToLabelPairs(metric model.Metric) []*prompb.Label { + labelPairs := make([]*prompb.Label, 0, len(metric)) + for k, v := range metric { + labelPairs = append(labelPairs, &prompb.Label{ + Name: string(k), + Value: string(v), + }) + } + return labelPairs +} + +// FromLabelPairs unpack a []LabelPair to a model.Metric +func FromLabelPairs(labelPairs []*prompb.Label) model.Metric { + metric := make(model.Metric, len(labelPairs)) + for _, l := range labelPairs { + metric[model.LabelName(l.Name)] = model.LabelValue(l.Value) + } + return metric +} diff --git a/storage/remote/read.go b/storage/remote/read.go index fc2c873aa..37587e5bf 100644 --- a/storage/remote/read.go +++ b/storage/remote/read.go @@ -18,7 +18,6 @@ import ( "sort" "github.com/prometheus/common/model" - "github.com/prometheus/prometheus/pkg/labels" "github.com/prometheus/prometheus/prompb" "github.com/prometheus/prometheus/storage" @@ -69,8 +68,7 @@ type querier struct { // Select returns a set of series that matches the given label matchers. func (q *querier) Select(matchers ...*labels.Matcher) storage.SeriesSet { m, added := q.addExternalLabels(matchers) - - res, err := q.client.Read(context.TODO(), q.mint, q.maxt, labelMatchersToProto(m)) + res, err := q.client.Read(context.TODO(), q.mint, q.maxt, m) if err != nil { return errSeriesSet{err: err} } @@ -90,31 +88,6 @@ func (q *querier) Select(matchers ...*labels.Matcher) storage.SeriesSet { } } -func labelMatchersToProto(matchers []*labels.Matcher) []*prompb.LabelMatcher { - pbMatchers := make([]*prompb.LabelMatcher, 0, len(matchers)) - for _, m := range matchers { - var mType prompb.LabelMatcher_Type - switch m.Type { - case labels.MatchEqual: - mType = prompb.LabelMatcher_EQ - case labels.MatchNotEqual: - mType = prompb.LabelMatcher_NEQ - case labels.MatchRegexp: - mType = prompb.LabelMatcher_RE - case labels.MatchNotRegexp: - mType = prompb.LabelMatcher_NRE - default: - panic("invalid matcher type") - } - pbMatchers = append(pbMatchers, &prompb.LabelMatcher{ - Type: mType, - Name: string(m.Name), - Value: string(m.Value), - }) - } - return pbMatchers -} - func labelPairsToLabels(labelPairs []*prompb.Label) labels.Labels { result := make(labels.Labels, 0, len(labelPairs)) for _, l := range labelPairs { From 3760f56c0c7341cccb6bd8c80f94822a05be4066 Mon Sep 17 00:00:00 2001 From: Tom Wilkie Date: Mon, 23 Oct 2017 14:53:43 +0100 Subject: [PATCH 02/14] remote: Expose ClientConfig type (see #3165) --- storage/remote/client.go | 23 +++++++++++++---------- storage/remote/client_test.go | 6 +++--- storage/remote/read_test.go | 10 +++++----- storage/remote/storage.go | 18 +++++++++--------- 4 files changed, 30 insertions(+), 27 deletions(-) diff --git a/storage/remote/client.go b/storage/remote/client.go index e5b5bf324..421506382 100644 --- a/storage/remote/client.go +++ b/storage/remote/client.go @@ -45,26 +45,27 @@ type Client struct { readRecent bool } -type clientConfig struct { - url *config.URL - timeout model.Duration - readRecent bool - httpClientConfig config.HTTPClientConfig +// ClientConfig configures a Client. +type ClientConfig struct { + URL *config.URL + Timeout model.Duration + ReadRecent bool + HTTPClientConfig config.HTTPClientConfig } // NewClient creates a new Client. -func NewClient(index int, conf *clientConfig) (*Client, error) { - httpClient, err := httputil.NewClientFromConfig(conf.httpClientConfig, "remote_storage") +func NewClient(index int, conf *ClientConfig) (*Client, error) { + httpClient, err := httputil.NewClientFromConfig(conf.HTTPClientConfig, "remote_storage") if err != nil { return nil, err } return &Client{ index: index, - url: conf.url, + url: conf.URL, client: httpClient, - timeout: time.Duration(conf.timeout), - readRecent: conf.readRecent, + timeout: time.Duration(conf.Timeout), + readRecent: conf.ReadRecent, }, nil } @@ -129,6 +130,8 @@ func (c *Client) Read(ctx context.Context, from, through int64, matchers []*labe } req := &prompb.ReadRequest{ + // TODO: Support batching multiple queries into one read request, + // as the protobuf interface allows for it. Queries: []*prompb.Query{ query, }, diff --git a/storage/remote/client_test.go b/storage/remote/client_test.go index 5790f3f00..9f3293ceb 100644 --- a/storage/remote/client_test.go +++ b/storage/remote/client_test.go @@ -64,9 +64,9 @@ func TestStoreHTTPErrorHandling(t *testing.T) { t.Fatal(err) } - c, err := NewClient(0, &clientConfig{ - url: &config.URL{serverURL}, - timeout: model.Duration(time.Second), + c, err := NewClient(0, &ClientConfig{ + URL: &config.URL{serverURL}, + Timeout: model.Duration(time.Second), }) if err != nil { t.Fatal(err) diff --git a/storage/remote/read_test.go b/storage/remote/read_test.go index 7fec0bb8e..c32ee9f67 100644 --- a/storage/remote/read_test.go +++ b/storage/remote/read_test.go @@ -184,11 +184,11 @@ func TestRemoteStorageQuerier(t *testing.T) { s := NewStorage(nil, func() (int64, error) { return test.localStartTime, nil }) s.clients = []*Client{} for _, readRecent := range test.readRecentClients { - c, _ := NewClient(0, &clientConfig{ - url: nil, - timeout: model.Duration(30 * time.Second), - httpClientConfig: config.HTTPClientConfig{}, - readRecent: readRecent, + c, _ := NewClient(0, &ClientConfig{ + URL: nil, + Timeout: model.Duration(30 * time.Second), + HTTPClientConfig: config.HTTPClientConfig{}, + ReadRecent: readRecent, }) s.clients = append(s.clients, c) } diff --git a/storage/remote/storage.go b/storage/remote/storage.go index 65f3f2d19..e5bc3aaa5 100644 --- a/storage/remote/storage.go +++ b/storage/remote/storage.go @@ -58,10 +58,10 @@ func (s *Storage) ApplyConfig(conf *config.Config) error { // TODO: we should only stop & recreate queues which have changes, // as this can be quite disruptive. for i, rwConf := range conf.RemoteWriteConfigs { - c, err := NewClient(i, &clientConfig{ - url: rwConf.URL, - timeout: rwConf.RemoteTimeout, - httpClientConfig: rwConf.HTTPClientConfig, + c, err := NewClient(i, &ClientConfig{ + URL: rwConf.URL, + Timeout: rwConf.RemoteTimeout, + HTTPClientConfig: rwConf.HTTPClientConfig, }) if err != nil { return err @@ -88,11 +88,11 @@ func (s *Storage) ApplyConfig(conf *config.Config) error { clients := []*Client{} for i, rrConf := range conf.RemoteReadConfigs { - c, err := NewClient(i, &clientConfig{ - url: rrConf.URL, - timeout: rrConf.RemoteTimeout, - httpClientConfig: rrConf.HTTPClientConfig, - readRecent: rrConf.ReadRecent, + c, err := NewClient(i, &ClientConfig{ + URL: rrConf.URL, + Timeout: rrConf.RemoteTimeout, + HTTPClientConfig: rrConf.HTTPClientConfig, + ReadRecent: rrConf.ReadRecent, }) if err != nil { return err From 8fe0212ff7b5300be2b6d1075c7cd0d2d0d495e6 Mon Sep 17 00:00:00 2001 From: Tom Wilkie Date: Mon, 23 Oct 2017 14:57:44 +0100 Subject: [PATCH 03/14] Port 'Make queue manager configurable.' to 2.0, see #2991 --- storage/remote/queue_manager.go | 44 +++------------------------- storage/remote/queue_manager_test.go | 17 ++++++----- storage/remote/storage.go | 2 +- 3 files changed, 14 insertions(+), 49 deletions(-) diff --git a/storage/remote/queue_manager.go b/storage/remote/queue_manager.go index 3f5e59338..591c5e9a2 100644 --- a/storage/remote/queue_manager.go +++ b/storage/remote/queue_manager.go @@ -124,42 +124,6 @@ func init() { prometheus.MustRegister(numShards) } -// QueueManagerConfig is the configuration for the queue used to write to remote -// storage. -type QueueManagerConfig struct { - // Number of samples to buffer per shard before we start dropping them. - QueueCapacity int - // Max number of shards, i.e. amount of concurrency. - MaxShards int - // Maximum number of samples per send. - MaxSamplesPerSend int - // Maximum time sample will wait in buffer. - BatchSendDeadline time.Duration - // Max number of times to retry a batch on recoverable errors. - MaxRetries int - // On recoverable errors, backoff exponentially. - MinBackoff time.Duration - MaxBackoff time.Duration -} - -// defaultQueueManagerConfig is the default remote queue configuration. -var defaultQueueManagerConfig = QueueManagerConfig{ - // With a maximum of 1000 shards, assuming an average of 100ms remote write - // time and 100 samples per batch, we will be able to push 1M samples/s. - MaxShards: 1000, - MaxSamplesPerSend: 100, - - // By default, buffer 1000 batches, which at 100ms per batch is 1:40mins. At - // 1000 shards, this will buffer 100M samples total. - QueueCapacity: 100 * 1000, - BatchSendDeadline: 5 * time.Second, - - // Max number of times to retry a batch on recoverable errors. - MaxRetries: 10, - MinBackoff: 30 * time.Millisecond, - MaxBackoff: 100 * time.Millisecond, -} - // StorageClient defines an interface for sending a batch of samples to an // external timeseries database. type StorageClient interface { @@ -174,7 +138,7 @@ type StorageClient interface { type QueueManager struct { logger log.Logger - cfg QueueManagerConfig + cfg config.QueueConfig externalLabels model.LabelSet relabelConfigs []*config.RelabelConfig client StorageClient @@ -193,7 +157,7 @@ type QueueManager struct { } // NewQueueManager builds a new QueueManager. -func NewQueueManager(logger log.Logger, cfg QueueManagerConfig, externalLabels model.LabelSet, relabelConfigs []*config.RelabelConfig, client StorageClient) *QueueManager { +func NewQueueManager(logger log.Logger, cfg config.QueueConfig, externalLabels model.LabelSet, relabelConfigs []*config.RelabelConfig, client StorageClient) *QueueManager { if logger == nil { logger = log.NewNopLogger() } @@ -216,7 +180,7 @@ func NewQueueManager(logger log.Logger, cfg QueueManagerConfig, externalLabels m } t.shards = t.newShards(t.numShards) numShards.WithLabelValues(t.queueName).Set(float64(t.numShards)) - queueCapacity.WithLabelValues(t.queueName).Set(float64(t.cfg.QueueCapacity)) + queueCapacity.WithLabelValues(t.queueName).Set(float64(t.cfg.Capacity)) return t } @@ -408,7 +372,7 @@ type shards struct { func (t *QueueManager) newShards(numShards int) *shards { queues := make([]chan *model.Sample, numShards) for i := 0; i < numShards; i++ { - queues[i] = make(chan *model.Sample, t.cfg.QueueCapacity) + queues[i] = make(chan *model.Sample, t.cfg.Capacity) } s := &shards{ qm: t, diff --git a/storage/remote/queue_manager_test.go b/storage/remote/queue_manager_test.go index e621496cb..c34dd94e9 100644 --- a/storage/remote/queue_manager_test.go +++ b/storage/remote/queue_manager_test.go @@ -21,6 +21,7 @@ import ( "time" "github.com/prometheus/common/model" + "github.com/prometheus/prometheus/config" ) type TestStorageClient struct { @@ -81,7 +82,7 @@ func (c *TestStorageClient) Name() string { func TestSampleDelivery(t *testing.T) { // Let's create an even number of send batches so we don't run into the // batch timeout case. - n := defaultQueueManagerConfig.QueueCapacity * 2 + n := config.DefaultQueueConfig.Capacity * 2 samples := make(model.Samples, 0, n) for i := 0; i < n; i++ { @@ -97,7 +98,7 @@ func TestSampleDelivery(t *testing.T) { c := NewTestStorageClient() c.expectSamples(samples[:len(samples)/2]) - cfg := defaultQueueManagerConfig + cfg := config.DefaultQueueConfig cfg.MaxShards = 1 m := NewQueueManager(nil, cfg, nil, nil, c) @@ -117,7 +118,7 @@ func TestSampleDelivery(t *testing.T) { func TestSampleDeliveryOrder(t *testing.T) { ts := 10 - n := defaultQueueManagerConfig.MaxSamplesPerSend * ts + n := config.DefaultQueueConfig.MaxSamplesPerSend * ts samples := make(model.Samples, 0, n) for i := 0; i < n; i++ { @@ -133,7 +134,7 @@ func TestSampleDeliveryOrder(t *testing.T) { c := NewTestStorageClient() c.expectSamples(samples) - m := NewQueueManager(nil, defaultQueueManagerConfig, nil, nil, c) + m := NewQueueManager(nil, config.DefaultQueueConfig, nil, nil, c) // These should be received by the client. for _, s := range samples { @@ -194,7 +195,7 @@ func TestSpawnNotMoreThanMaxConcurrentSendsGoroutines(t *testing.T) { // `MaxSamplesPerSend*Shards` samples should be consumed by the // per-shard goroutines, and then another `MaxSamplesPerSend` // should be left on the queue. - n := defaultQueueManagerConfig.MaxSamplesPerSend * 2 + n := config.DefaultQueueConfig.MaxSamplesPerSend * 2 samples := make(model.Samples, 0, n) for i := 0; i < n; i++ { @@ -208,9 +209,9 @@ func TestSpawnNotMoreThanMaxConcurrentSendsGoroutines(t *testing.T) { } c := NewTestBlockedStorageClient() - cfg := defaultQueueManagerConfig + cfg := config.DefaultQueueConfig cfg.MaxShards = 1 - cfg.QueueCapacity = n + cfg.Capacity = n m := NewQueueManager(nil, cfg, nil, nil, c) m.Start() @@ -240,7 +241,7 @@ func TestSpawnNotMoreThanMaxConcurrentSendsGoroutines(t *testing.T) { time.Sleep(10 * time.Millisecond) } - if m.queueLen() != defaultQueueManagerConfig.MaxSamplesPerSend { + if m.queueLen() != config.DefaultQueueConfig.MaxSamplesPerSend { t.Fatalf("Failed to drain QueueManager queue, %d elements left", m.queueLen(), ) diff --git a/storage/remote/storage.go b/storage/remote/storage.go index e5bc3aaa5..bb8c17372 100644 --- a/storage/remote/storage.go +++ b/storage/remote/storage.go @@ -68,7 +68,7 @@ func (s *Storage) ApplyConfig(conf *config.Config) error { } newQueues = append(newQueues, NewQueueManager( s.logger, - defaultQueueManagerConfig, + config.DefaultQueueConfig, conf.GlobalConfig.ExternalLabels, rwConf.WriteRelabelConfigs, c, From 08b73286694d701c43089267a52b6eea748786a7 Mon Sep 17 00:00:00 2001 From: Conor Broderick Date: Mon, 24 Jul 2017 13:49:20 +0100 Subject: [PATCH 04/14] Port Metric name validation to 2.0 (see #2975) --- storage/remote/read.go | 21 +++++++ storage/remote/read_test.go | 122 ++++++++++++++++++++++++++++++++++++ 2 files changed, 143 insertions(+) diff --git a/storage/remote/read.go b/storage/remote/read.go index 37587e5bf..706d632c2 100644 --- a/storage/remote/read.go +++ b/storage/remote/read.go @@ -15,6 +15,7 @@ package remote import ( "context" + "fmt" "sort" "github.com/prometheus/common/model" @@ -77,6 +78,10 @@ func (q *querier) Select(matchers ...*labels.Matcher) storage.SeriesSet { for _, ts := range res { labels := labelPairsToLabels(ts.Labels) removeLabels(&labels, added) + if err := validateLabelsAndMetricName(labels); err != nil { + return errSeriesSet{err: err} + } + series = append(series, &concreteSeries{ labels: labels, samples: ts.Samples, @@ -205,6 +210,22 @@ func (c *concreteSeriesIterator) Err() error { return nil } +// validateLabelsAndMetricName validates the label names/values and metric names returned from remote read. +func validateLabelsAndMetricName(ls labels.Labels) error { + for _, l := range ls { + if l.Name == labels.MetricName && !model.IsValidMetricName(model.LabelValue(l.Value)) { + return fmt.Errorf("Invalid metric name: %v", l.Value) + } + if !model.LabelName(l.Name).IsValid() { + return fmt.Errorf("Invalid label name: %v", l.Name) + } + if !model.LabelValue(l.Value).IsValid() { + return fmt.Errorf("Invalid label value: %v", l.Value) + } + } + return nil +} + // addExternalLabels adds matchers for each external label. External labels // that already have a corresponding user-supplied matcher are skipped, as we // assume that the user explicitly wants to select a different value for them. diff --git a/storage/remote/read_test.go b/storage/remote/read_test.go index c32ee9f67..376dce3a3 100644 --- a/storage/remote/read_test.go +++ b/storage/remote/read_test.go @@ -27,6 +27,128 @@ import ( "github.com/prometheus/prometheus/storage" ) +func TestValidateLabelsAndMetricName(t *testing.T) { + tests := []struct { + result model.Matrix + expectedErr string + shouldPass bool + }{ + { + result: model.Matrix{ + &model.SampleStream{ + Metric: model.Metric{ + "__name__": "name", + "labelName": "labelValue", + }, + }, + }, + expectedErr: "", + shouldPass: true, + }, + { + result: model.Matrix{ + &model.SampleStream{ + Metric: model.Metric{ + "__name__": "name", + "_labelName": "labelValue", + }, + }, + }, + expectedErr: "", + shouldPass: true, + }, + { + result: model.Matrix{ + &model.SampleStream{ + Metric: model.Metric{ + "__name__": "name", + "@labelName": "labelValue", + }, + }, + }, + expectedErr: "Invalid label name: @labelName", + shouldPass: false, + }, + { + result: model.Matrix{ + &model.SampleStream{ + Metric: model.Metric{ + "__name__": "name", + "123labelName": "labelValue", + }, + }, + }, + expectedErr: "Invalid label name: 123labelName", + shouldPass: false, + }, + { + result: model.Matrix{ + &model.SampleStream{ + Metric: model.Metric{ + "__name__": "name", + "": "labelValue", + }, + }, + }, + expectedErr: "Invalid label name: ", + shouldPass: false, + }, + { + result: model.Matrix{ + &model.SampleStream{ + Metric: model.Metric{ + "__name__": "name", + "labelName": model.LabelValue([]byte{0xff}), + }, + }, + }, + expectedErr: "Invalid label value: " + string([]byte{0xff}), + shouldPass: false, + }, + { + result: model.Matrix{ + &model.SampleStream{ + Metric: model.Metric{ + "__name__": "@invalid_name", + }, + }, + }, + expectedErr: "Invalid metric name: @invalid_name", + shouldPass: false, + }, + } + + for _, test := range tests { + var err error + for _, ss := range test.result { + ls := make(labels.Labels, 0, len(ss.Metric)) + for k, v := range ss.Metric { + ls = append(ls, labels.Label{ + Name: string(k), + Value: string(v), + }) + } + err = validateLabelsAndMetricName(ls) + if err != nil { + break + } + } + if test.shouldPass { + if err != nil { + t.Fatalf("Test should pass, got unexpected error: %v", err) + } + continue + } + if err != nil { + if err.Error() != test.expectedErr { + t.Fatalf("Unexpected error, got: %v, expected: %v", err, test.expectedErr) + } + } else { + t.Fatalf("Expected error, got none") + } + } +} + func mustNewLabelMatcher(mt labels.MatchType, name, val string) *labels.Matcher { m, err := labels.NewMatcher(mt, name, val) if err != nil { From 56820726fa6f2621280f4390d91b6a7cc1d47467 Mon Sep 17 00:00:00 2001 From: Tom Wilkie Date: Mon, 23 Oct 2017 16:56:47 +0100 Subject: [PATCH 05/14] Move a couple of the encoding/decoding functions into codec.go --- storage/remote/codec.go | 37 +++++++++++++++++++++++++++++-------- storage/remote/read.go | 14 +------------- storage/remote/write.go | 9 --------- 3 files changed, 30 insertions(+), 30 deletions(-) diff --git a/storage/remote/codec.go b/storage/remote/codec.go index 2d5b7bb55..214d1468d 100644 --- a/storage/remote/codec.go +++ b/storage/remote/codec.go @@ -1,4 +1,4 @@ -// Copyright 2016 The Prometheus Authors +// Copyright 2017 The Prometheus Authors // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at @@ -17,6 +17,7 @@ import ( "fmt" "io/ioutil" "net/http" + "sort" "github.com/gogo/protobuf/proto" "github.com/golang/snappy" @@ -69,7 +70,7 @@ func ToWriteRequest(samples []*model.Sample) *prompb.WriteRequest { for _, s := range samples { ts := prompb.TimeSeries{ - Labels: ToLabelPairs(s.Metric), + Labels: MetricToLabelProtos(s.Metric), Samples: []*prompb.Sample{ { Value: float64(s.Value), @@ -113,7 +114,7 @@ func ToQueryResult(matrix model.Matrix) *prompb.QueryResult { resp := &prompb.QueryResult{} for _, ss := range matrix { ts := prompb.TimeSeries{ - Labels: ToLabelPairs(ss.Metric), + Labels: MetricToLabelProtos(ss.Metric), Samples: make([]*prompb.Sample, 0, len(ss.Values)), } for _, s := range ss.Values { @@ -132,7 +133,7 @@ func FromQueryResult(resp *prompb.QueryResult) model.Matrix { m := make(model.Matrix, 0, len(resp.Timeseries)) for _, ts := range resp.Timeseries { var ss model.SampleStream - ss.Metric = FromLabelPairs(ts.Labels) + ss.Metric = LabelProtosToMetric(ts.Labels) ss.Values = make([]model.SamplePair, 0, len(ts.Samples)) for _, s := range ts.Samples { ss.Values = append(ss.Values, model.SamplePair{ @@ -196,8 +197,8 @@ func fromLabelMatchers(matchers []*prompb.LabelMatcher) ([]*labels.Matcher, erro return result, nil } -// ToLabelPairs builds a []LabelPair from a model.Metric -func ToLabelPairs(metric model.Metric) []*prompb.Label { +// MetricToLabelProtos builds a []*prompb.Label from a model.Metric +func MetricToLabelProtos(metric model.Metric) []*prompb.Label { labelPairs := make([]*prompb.Label, 0, len(metric)) for k, v := range metric { labelPairs = append(labelPairs, &prompb.Label{ @@ -208,11 +209,31 @@ func ToLabelPairs(metric model.Metric) []*prompb.Label { return labelPairs } -// FromLabelPairs unpack a []LabelPair to a model.Metric -func FromLabelPairs(labelPairs []*prompb.Label) model.Metric { +// LabelProtosToMetric unpack a []*prompb.Label to a model.Metric +func LabelProtosToMetric(labelPairs []*prompb.Label) model.Metric { metric := make(model.Metric, len(labelPairs)) for _, l := range labelPairs { metric[model.LabelName(l.Name)] = model.LabelValue(l.Value) } return metric } + +func labelProtosToLabels(labelPairs []*prompb.Label) labels.Labels { + result := make(labels.Labels, 0, len(labelPairs)) + for _, l := range labelPairs { + result = append(result, labels.Label{ + Name: l.Name, + Value: l.Value, + }) + } + sort.Sort(result) + return result +} + +func labelsToMetric(ls labels.Labels) model.Metric { + metric := make(model.Metric, len(ls)) + for _, l := range ls { + metric[model.LabelName(l.Name)] = model.LabelValue(l.Value) + } + return metric +} diff --git a/storage/remote/read.go b/storage/remote/read.go index 706d632c2..e15a3b618 100644 --- a/storage/remote/read.go +++ b/storage/remote/read.go @@ -76,7 +76,7 @@ func (q *querier) Select(matchers ...*labels.Matcher) storage.SeriesSet { series := make([]storage.Series, 0, len(res)) for _, ts := range res { - labels := labelPairsToLabels(ts.Labels) + labels := labelProtosToLabels(ts.Labels) removeLabels(&labels, added) if err := validateLabelsAndMetricName(labels); err != nil { return errSeriesSet{err: err} @@ -93,18 +93,6 @@ func (q *querier) Select(matchers ...*labels.Matcher) storage.SeriesSet { } } -func labelPairsToLabels(labelPairs []*prompb.Label) labels.Labels { - result := make(labels.Labels, 0, len(labelPairs)) - for _, l := range labelPairs { - result = append(result, labels.Label{ - Name: l.Name, - Value: l.Value, - }) - } - sort.Sort(result) - return result -} - type byLabel []storage.Series func (a byLabel) Len() int { return len(a) } diff --git a/storage/remote/write.go b/storage/remote/write.go index 5cbd926d7..514dcbe43 100644 --- a/storage/remote/write.go +++ b/storage/remote/write.go @@ -38,15 +38,6 @@ func (s *Storage) Add(l labels.Labels, t int64, v float64) (uint64, error) { return 0, nil } -func labelsToMetric(ls labels.Labels) model.Metric { - metric := make(model.Metric, len(ls)) - for _, l := range ls { - metric[model.LabelName(l.Name)] = model.LabelValue(l.Value) - } - return metric -} - -// AddFast implements storage.Appender. func (s *Storage) AddFast(l labels.Labels, _ uint64, t int64, v float64) error { _, err := s.Add(l, t, v) return err From 0997191b182bc40a1e050856084fe9b8dc46f523 Mon Sep 17 00:00:00 2001 From: Bryan Boreham Date: Thu, 5 Oct 2017 12:32:24 +0100 Subject: [PATCH 06/14] Port 'Don't disable HTTP keep-alives for remote storage connections.' to 2.0 (see #3173) Removes configurability introduced in #3160 in favour of hard-coding, per advice from @brian-brazil. --- storage/remote/client.go | 2 +- util/httputil/client.go | 8 +++++++- 2 files changed, 8 insertions(+), 2 deletions(-) diff --git a/storage/remote/client.go b/storage/remote/client.go index 421506382..79ccb5910 100644 --- a/storage/remote/client.go +++ b/storage/remote/client.go @@ -55,7 +55,7 @@ type ClientConfig struct { // NewClient creates a new Client. func NewClient(index int, conf *ClientConfig) (*Client, error) { - httpClient, err := httputil.NewClientFromConfig(conf.HTTPClientConfig, "remote_storage") + httpClient, err := httputil.NewClientFromConfigAndOptions(conf.HTTPClientConfig, "remote_storage", false) if err != nil { return nil, err } diff --git a/util/httputil/client.go b/util/httputil/client.go index e36e35361..bed76b243 100644 --- a/util/httputil/client.go +++ b/util/httputil/client.go @@ -34,6 +34,12 @@ func newClient(rt http.RoundTripper) *http.Client { // NewClientFromConfig returns a new HTTP client configured for the // given config.HTTPClientConfig. The name is used as go-conntrack metric label. func NewClientFromConfig(cfg config.HTTPClientConfig, name string) (*http.Client, error) { + return NewClientFromConfigAndOptions(cfg, name, true) +} + +// NewClientFromConfigAndOptions returns a new HTTP client configured for the +// given config.HTTPClientConfig. The name is used as go-conntrack metric label. +func NewClientFromConfigAndOptions(cfg config.HTTPClientConfig, name string, disableKeepAlives bool) (*http.Client, error) { tlsConfig, err := NewTLSConfig(cfg.TLSConfig) if err != nil { return nil, err @@ -43,7 +49,7 @@ func NewClientFromConfig(cfg config.HTTPClientConfig, name string) (*http.Client var rt http.RoundTripper = &http.Transport{ Proxy: http.ProxyURL(cfg.ProxyURL.URL), MaxIdleConns: 20000, - DisableKeepAlives: false, + DisableKeepAlives: disableKeepAlives, TLSClientConfig: tlsConfig, DisableCompression: true, // 5 minutes is typically above the maximum sane scrape interval. So we can From ee011d906d683278518fa3d8439d08b0871e93ff Mon Sep 17 00:00:00 2001 From: Tom Wilkie Date: Mon, 23 Oct 2017 21:28:17 +0100 Subject: [PATCH 07/14] Port remote read server to 2.0. --- storage/remote/client.go | 13 +- storage/remote/client_test.go | 5 +- storage/remote/codec.go | 192 ++++++++++++++++++++++----- storage/remote/codec_test.go | 113 ++++++++++++++++ storage/remote/queue_manager.go | 6 +- storage/remote/queue_manager_test.go | 40 +++--- storage/remote/read.go | 177 +++++++----------------- storage/remote/read_test.go | 181 +++---------------------- web/api/v1/api.go | 77 +++++++++++ web/api/v1/api_test.go | 103 ++++++++++++++ 10 files changed, 551 insertions(+), 356 deletions(-) create mode 100644 storage/remote/codec_test.go diff --git a/storage/remote/client.go b/storage/remote/client.go index 79ccb5910..462e7c5f7 100644 --- a/storage/remote/client.go +++ b/storage/remote/client.go @@ -29,7 +29,6 @@ import ( "golang.org/x/net/context/ctxhttp" "github.com/prometheus/prometheus/config" - "github.com/prometheus/prometheus/pkg/labels" "github.com/prometheus/prometheus/prompb" "github.com/prometheus/prometheus/util/httputil" ) @@ -74,8 +73,7 @@ type recoverableError struct { } // Store sends a batch of samples to the HTTP endpoint. -func (c *Client) Store(samples model.Samples) error { - req := ToWriteRequest(samples) +func (c *Client) Store(req *prompb.WriteRequest) error { data, err := proto.Marshal(req) if err != nil { return err @@ -123,12 +121,7 @@ func (c Client) Name() string { } // Read reads from a remote endpoint. -func (c *Client) Read(ctx context.Context, from, through int64, matchers []*labels.Matcher) ([]*prompb.TimeSeries, error) { - query, err := ToQuery(from, through, matchers) - if err != nil { - return nil, err - } - +func (c *Client) Read(ctx context.Context, query *prompb.Query) (*prompb.QueryResult, error) { req := &prompb.ReadRequest{ // TODO: Support batching multiple queries into one read request, // as the protobuf interface allows for it. @@ -182,5 +175,5 @@ func (c *Client) Read(ctx context.Context, from, through int64, matchers []*labe return nil, fmt.Errorf("responses: want %d, got %d", len(req.Queries), len(resp.Results)) } - return resp.Results[0].Timeseries, nil + return resp.Results[0], nil } diff --git a/storage/remote/client_test.go b/storage/remote/client_test.go index 9f3293ceb..2b5893d6d 100644 --- a/storage/remote/client_test.go +++ b/storage/remote/client_test.go @@ -25,6 +25,7 @@ import ( "github.com/prometheus/common/model" "github.com/prometheus/prometheus/config" + "github.com/prometheus/prometheus/prompb" ) var longErrMessage = strings.Repeat("error message", maxErrMsgLen) @@ -65,14 +66,14 @@ func TestStoreHTTPErrorHandling(t *testing.T) { } c, err := NewClient(0, &ClientConfig{ - URL: &config.URL{serverURL}, + URL: &config.URL{URL: serverURL}, Timeout: model.Duration(time.Second), }) if err != nil { t.Fatal(err) } - err = c.Store(nil) + err = c.Store(&prompb.WriteRequest{}) if !reflect.DeepEqual(err, test.err) { t.Errorf("%d. Unexpected error; want %v, got %v", i, test.err, err) } diff --git a/storage/remote/codec.go b/storage/remote/codec.go index 214d1468d..8de5ed182 100644 --- a/storage/remote/codec.go +++ b/storage/remote/codec.go @@ -25,6 +25,7 @@ import ( "github.com/prometheus/prometheus/pkg/labels" "github.com/prometheus/prometheus/prompb" + "github.com/prometheus/prometheus/storage" ) // DecodeReadRequest reads a remote.Request from a http.Request. @@ -99,52 +100,166 @@ func ToQuery(from, to int64, matchers []*labels.Matcher) (*prompb.Query, error) } // FromQuery unpacks a Query proto. -func FromQuery(req *prompb.Query) (model.Time, model.Time, []*labels.Matcher, error) { +func FromQuery(req *prompb.Query) (int64, int64, []*labels.Matcher, error) { matchers, err := fromLabelMatchers(req.Matchers) if err != nil { return 0, 0, nil, err } - from := model.Time(req.StartTimestampMs) - to := model.Time(req.EndTimestampMs) - return from, to, matchers, nil + return req.StartTimestampMs, req.EndTimestampMs, matchers, nil } // ToQueryResult builds a QueryResult proto. -func ToQueryResult(matrix model.Matrix) *prompb.QueryResult { +func ToQueryResult(ss storage.SeriesSet) (*prompb.QueryResult, error) { resp := &prompb.QueryResult{} - for _, ss := range matrix { - ts := prompb.TimeSeries{ - Labels: MetricToLabelProtos(ss.Metric), - Samples: make([]*prompb.Sample, 0, len(ss.Values)), - } - for _, s := range ss.Values { - ts.Samples = append(ts.Samples, &prompb.Sample{ - Value: float64(s.Value), - Timestamp: int64(s.Timestamp), + for ss.Next() { + series := ss.At() + iter := series.Iterator() + samples := []*prompb.Sample{} + + for iter.Next() { + ts, val := iter.At() + samples = append(samples, &prompb.Sample{ + Timestamp: ts, + Value: val, }) } - resp.Timeseries = append(resp.Timeseries, &ts) + if err := iter.Err(); err != nil { + return nil, err + } + + resp.Timeseries = append(resp.Timeseries, &prompb.TimeSeries{ + Labels: labelsToLabelsProto(series.Labels()), + Samples: samples, + }) } - return resp + if err := ss.Err(); err != nil { + return nil, err + } + return resp, nil } // FromQueryResult unpacks a QueryResult proto. -func FromQueryResult(resp *prompb.QueryResult) model.Matrix { - m := make(model.Matrix, 0, len(resp.Timeseries)) - for _, ts := range resp.Timeseries { - var ss model.SampleStream - ss.Metric = LabelProtosToMetric(ts.Labels) - ss.Values = make([]model.SamplePair, 0, len(ts.Samples)) - for _, s := range ts.Samples { - ss.Values = append(ss.Values, model.SamplePair{ - Value: model.SampleValue(s.Value), - Timestamp: model.Time(s.Timestamp), - }) +func FromQueryResult(res *prompb.QueryResult) storage.SeriesSet { + series := make([]storage.Series, 0, len(res.Timeseries)) + for _, ts := range res.Timeseries { + labels := labelProtosToLabels(ts.Labels) + if err := validateLabelsAndMetricName(labels); err != nil { + return errSeriesSet{err: err} } - m = append(m, &ss) - } - return m + series = append(series, &concreteSeries{ + labels: labels, + samples: ts.Samples, + }) + } + sort.Sort(byLabel(series)) + return &concreteSeriesSet{ + series: series, + } +} + +// errSeriesSet implements storage.SeriesSet, just returning an error. +type errSeriesSet struct { + err error +} + +func (errSeriesSet) Next() bool { + return false +} + +func (errSeriesSet) At() storage.Series { + return nil +} + +func (e errSeriesSet) Err() error { + return e.err +} + +// concreteSeriesSet implements storage.SeriesSet. +type concreteSeriesSet struct { + cur int + series []storage.Series +} + +func (c *concreteSeriesSet) Next() bool { + c.cur++ + return c.cur-1 < len(c.series) +} + +func (c *concreteSeriesSet) At() storage.Series { + return c.series[c.cur-1] +} + +func (c *concreteSeriesSet) Err() error { + return nil +} + +// concreteSeries implementes storage.Series. +type concreteSeries struct { + labels labels.Labels + samples []*prompb.Sample +} + +func (c *concreteSeries) Labels() labels.Labels { + return c.labels +} + +func (c *concreteSeries) Iterator() storage.SeriesIterator { + return newConcreteSeriersIterator(c) +} + +// concreteSeriesIterator implements storage.SeriesIterator. +type concreteSeriesIterator struct { + cur int + series *concreteSeries +} + +func newConcreteSeriersIterator(series *concreteSeries) storage.SeriesIterator { + return &concreteSeriesIterator{ + cur: -1, + series: series, + } +} + +// Seek implements storage.SeriesIterator. +func (c *concreteSeriesIterator) Seek(t int64) bool { + c.cur = sort.Search(len(c.series.samples), func(n int) bool { + return c.series.samples[n].Timestamp >= t + }) + return c.cur < len(c.series.samples) +} + +// At implements storage.SeriesIterator. +func (c *concreteSeriesIterator) At() (t int64, v float64) { + s := c.series.samples[c.cur] + return s.Timestamp, s.Value +} + +// Next implements storage.SeriesIterator. +func (c *concreteSeriesIterator) Next() bool { + c.cur++ + return c.cur < len(c.series.samples) +} + +// Err implements storage.SeriesIterator. +func (c *concreteSeriesIterator) Err() error { + return nil +} + +// validateLabelsAndMetricName validates the label names/values and metric names returned from remote read. +func validateLabelsAndMetricName(ls labels.Labels) error { + for _, l := range ls { + if l.Name == labels.MetricName && !model.IsValidMetricName(model.LabelValue(l.Value)) { + return fmt.Errorf("Invalid metric name: %v", l.Value) + } + if !model.LabelName(l.Name).IsValid() { + return fmt.Errorf("Invalid label name: %v", l.Name) + } + if !model.LabelValue(l.Value).IsValid() { + return fmt.Errorf("Invalid label value: %v", l.Value) + } + } + return nil } func toLabelMatchers(matchers []*labels.Matcher) ([]*prompb.LabelMatcher, error) { @@ -199,14 +314,14 @@ func fromLabelMatchers(matchers []*prompb.LabelMatcher) ([]*labels.Matcher, erro // MetricToLabelProtos builds a []*prompb.Label from a model.Metric func MetricToLabelProtos(metric model.Metric) []*prompb.Label { - labelPairs := make([]*prompb.Label, 0, len(metric)) + labels := make([]*prompb.Label, 0, len(metric)) for k, v := range metric { - labelPairs = append(labelPairs, &prompb.Label{ + labels = append(labels, &prompb.Label{ Name: string(k), Value: string(v), }) } - return labelPairs + return labels } // LabelProtosToMetric unpack a []*prompb.Label to a model.Metric @@ -230,6 +345,17 @@ func labelProtosToLabels(labelPairs []*prompb.Label) labels.Labels { return result } +func labelsToLabelsProto(labels labels.Labels) []*prompb.Label { + result := make([]*prompb.Label, 0, len(labels)) + for _, l := range labels { + result = append(result, &prompb.Label{ + Name: l.Name, + Value: l.Value, + }) + } + return result +} + func labelsToMetric(ls labels.Labels) model.Metric { metric := make(model.Metric, len(ls)) for _, l := range ls { diff --git a/storage/remote/codec_test.go b/storage/remote/codec_test.go new file mode 100644 index 000000000..d62e6da45 --- /dev/null +++ b/storage/remote/codec_test.go @@ -0,0 +1,113 @@ +package remote + +import ( + "testing" + + "github.com/prometheus/prometheus/pkg/labels" + "github.com/prometheus/prometheus/prompb" + "github.com/prometheus/prometheus/storage" +) + +func TestValidateLabelsAndMetricName(t *testing.T) { + tests := []struct { + input labels.Labels + expectedErr string + shouldPass bool + }{ + { + input: labels.FromStrings( + "__name__", "name", + "labelName", "labelValue", + ), + expectedErr: "", + shouldPass: true, + }, + { + input: labels.FromStrings( + "__name__", "name", + "_labelName", "labelValue", + ), + expectedErr: "", + shouldPass: true, + }, + { + input: labels.FromStrings( + "__name__", "name", + "@labelName", "labelValue", + ), + expectedErr: "Invalid label name: @labelName", + shouldPass: false, + }, + { + input: labels.FromStrings( + "__name__", "name", + "123labelName", "labelValue", + ), + expectedErr: "Invalid label name: 123labelName", + shouldPass: false, + }, + { + input: labels.FromStrings( + "__name__", "name", + "", "labelValue", + ), + expectedErr: "Invalid label name: ", + shouldPass: false, + }, + { + input: labels.FromStrings( + "__name__", "name", + "labelName", string([]byte{0xff}), + ), + expectedErr: "Invalid label value: " + string([]byte{0xff}), + shouldPass: false, + }, + { + input: labels.FromStrings( + "__name__", "@invalid_name", + ), + expectedErr: "Invalid metric name: @invalid_name", + shouldPass: false, + }, + } + + for _, test := range tests { + err := validateLabelsAndMetricName(test.input) + if test.shouldPass != (err == nil) { + if test.shouldPass { + t.Fatalf("Test should pass, got unexpected error: %v", err) + } else { + t.Fatalf("Test should fail, unexpected error, got: %v, expected: %v", err, test.expectedErr) + } + } + } +} + +func TestConcreteSeriesSet(t *testing.T) { + series1 := &concreteSeries{ + labels: labels.FromStrings("foo", "bar"), + samples: []*prompb.Sample{&prompb.Sample{Value: 1, Timestamp: 2}}, + } + series2 := &concreteSeries{ + labels: labels.FromStrings("foo", "baz"), + samples: []*prompb.Sample{&prompb.Sample{Value: 3, Timestamp: 4}}, + } + c := &concreteSeriesSet{ + series: []storage.Series{series1, series2}, + } + if !c.Next() { + t.Fatalf("Expected Next() to be true.") + } + if c.At() != series1 { + t.Fatalf("Unexpected series returned.") + } + if !c.Next() { + t.Fatalf("Expected Next() to be true.") + } + if c.At() != series2 { + t.Fatalf("Unexpected series returned.") + } + if c.Next() { + t.Fatalf("Expected Next() to be false.") + } +} diff --git a/storage/remote/queue_manager.go b/storage/remote/queue_manager.go index 591c5e9a2..e47e421fb 100644 --- a/storage/remote/queue_manager.go +++ b/storage/remote/queue_manager.go @@ -25,6 +25,7 @@ import ( "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/common/model" "github.com/prometheus/prometheus/config" + "github.com/prometheus/prometheus/prompb" "github.com/prometheus/prometheus/relabel" ) @@ -128,7 +129,7 @@ func init() { // external timeseries database. type StorageClient interface { // Store stores the given samples in the remote storage. - Store(model.Samples) error + Store(*prompb.WriteRequest) error // Name identifies the remote storage implementation. Name() string } @@ -466,7 +467,8 @@ func (s *shards) sendSamplesWithBackoff(samples model.Samples) { backoff := s.qm.cfg.MinBackoff for retries := s.qm.cfg.MaxRetries; retries > 0; retries-- { begin := time.Now() - err := s.qm.client.Store(samples) + req := ToWriteRequest(samples) + err := s.qm.client.Store(req) sentBatchDuration.WithLabelValues(s.qm.queueName).Observe(time.Since(begin).Seconds()) if err == nil { diff --git a/storage/remote/queue_manager_test.go b/storage/remote/queue_manager_test.go index c34dd94e9..9d2e7a34b 100644 --- a/storage/remote/queue_manager_test.go +++ b/storage/remote/queue_manager_test.go @@ -15,6 +15,7 @@ package remote import ( "fmt" + "reflect" "sync" "sync/atomic" "testing" @@ -22,19 +23,20 @@ import ( "github.com/prometheus/common/model" "github.com/prometheus/prometheus/config" + "github.com/prometheus/prometheus/prompb" ) type TestStorageClient struct { - receivedSamples map[string]model.Samples - expectedSamples map[string]model.Samples + receivedSamples map[string][]*prompb.Sample + expectedSamples map[string][]*prompb.Sample wg sync.WaitGroup mtx sync.Mutex } func NewTestStorageClient() *TestStorageClient { return &TestStorageClient{ - receivedSamples: map[string]model.Samples{}, - expectedSamples: map[string]model.Samples{}, + receivedSamples: map[string][]*prompb.Sample{}, + expectedSamples: map[string][]*prompb.Sample{}, } } @@ -43,8 +45,11 @@ func (c *TestStorageClient) expectSamples(ss model.Samples) { defer c.mtx.Unlock() for _, s := range ss { - ts := s.Metric.String() - c.expectedSamples[ts] = append(c.expectedSamples[ts], s) + ts := labelProtosToLabels(MetricToLabelProtos(s.Metric)).String() + c.expectedSamples[ts] = append(c.expectedSamples[ts], &prompb.Sample{ + Timestamp: int64(s.Timestamp), + Value: float64(s.Value), + }) } c.wg.Add(len(ss)) } @@ -55,23 +60,24 @@ func (c *TestStorageClient) waitForExpectedSamples(t *testing.T) { c.mtx.Lock() defer c.mtx.Unlock() for ts, expectedSamples := range c.expectedSamples { - for i, expected := range expectedSamples { - if !expected.Equal(c.receivedSamples[ts][i]) { - t.Fatalf("%d. Expected %v, got %v", i, expected, c.receivedSamples[ts][i]) - } + if !reflect.DeepEqual(expectedSamples, c.receivedSamples[ts]) { + t.Fatalf("%s: Expected %v, got %v", ts, expectedSamples, c.receivedSamples[ts]) } } } -func (c *TestStorageClient) Store(ss model.Samples) error { +func (c *TestStorageClient) Store(req *prompb.WriteRequest) error { c.mtx.Lock() defer c.mtx.Unlock() - - for _, s := range ss { - ts := s.Metric.String() - c.receivedSamples[ts] = append(c.receivedSamples[ts], s) + count := 0 + for _, ts := range req.Timeseries { + labels := labelProtosToLabels(ts.Labels).String() + for _, sample := range ts.Samples { + count++ + c.receivedSamples[labels] = append(c.receivedSamples[labels], sample) + } } - c.wg.Add(-len(ss)) + c.wg.Add(-count) return nil } @@ -162,7 +168,7 @@ func NewTestBlockedStorageClient() *TestBlockingStorageClient { } } -func (c *TestBlockingStorageClient) Store(s model.Samples) error { +func (c *TestBlockingStorageClient) Store(_ *prompb.WriteRequest) error { atomic.AddUint64(&c.numCalls, 1) <-c.block return nil diff --git a/storage/remote/read.go b/storage/remote/read.go index e15a3b618..c466e7ddb 100644 --- a/storage/remote/read.go +++ b/storage/remote/read.go @@ -15,17 +15,14 @@ package remote import ( "context" - "fmt" - "sort" "github.com/prometheus/common/model" "github.com/prometheus/prometheus/pkg/labels" - "github.com/prometheus/prometheus/prompb" "github.com/prometheus/prometheus/storage" ) // Querier returns a new Querier on the storage. -func (r *Storage) Querier(_ context.Context, mint, maxt int64) (storage.Querier, error) { +func (r *Storage) Querier(ctx context.Context, mint, maxt int64) (storage.Querier, error) { r.mtx.Lock() defer r.mtx.Unlock() @@ -47,6 +44,7 @@ func (r *Storage) Querier(_ context.Context, mint, maxt int64) (storage.Querier, } } queriers = append(queriers, &querier{ + ctx: ctx, mint: mint, maxt: cmaxt, client: c, @@ -61,6 +59,7 @@ var newMergeQueriers = storage.NewMergeQuerier // Querier is an adapter to make a Client usable as a storage.Querier. type querier struct { + ctx context.Context mint, maxt int64 client *Client externalLabels model.LabelSet @@ -69,28 +68,20 @@ type querier struct { // Select returns a set of series that matches the given label matchers. func (q *querier) Select(matchers ...*labels.Matcher) storage.SeriesSet { m, added := q.addExternalLabels(matchers) - res, err := q.client.Read(context.TODO(), q.mint, q.maxt, m) + + query, err := ToQuery(q.mint, q.maxt, m) if err != nil { return errSeriesSet{err: err} } - series := make([]storage.Series, 0, len(res)) - for _, ts := range res { - labels := labelProtosToLabels(ts.Labels) - removeLabels(&labels, added) - if err := validateLabelsAndMetricName(labels); err != nil { - return errSeriesSet{err: err} - } + res, err := q.client.Read(q.ctx, query) + if err != nil { + return errSeriesSet{err: err} + } - series = append(series, &concreteSeries{ - labels: labels, - samples: ts.Samples, - }) - } - sort.Sort(byLabel(series)) - return &concreteSeriesSet{ - series: series, - } + seriesSet := FromQueryResult(res) + + return newSeriesSetFilter(seriesSet, added) } type byLabel []storage.Series @@ -110,110 +101,6 @@ func (q *querier) Close() error { return nil } -// errSeriesSet implements storage.SeriesSet, just returning an error. -type errSeriesSet struct { - err error -} - -func (errSeriesSet) Next() bool { - return false -} - -func (errSeriesSet) At() storage.Series { - return nil -} - -func (e errSeriesSet) Err() error { - return e.err -} - -// concreteSeriesSet implements storage.SeriesSet. -type concreteSeriesSet struct { - cur int - series []storage.Series -} - -func (c *concreteSeriesSet) Next() bool { - c.cur++ - return c.cur-1 < len(c.series) -} - -func (c *concreteSeriesSet) At() storage.Series { - return c.series[c.cur-1] -} - -func (c *concreteSeriesSet) Err() error { - return nil -} - -// concreteSeries implementes storage.Series. -type concreteSeries struct { - labels labels.Labels - samples []*prompb.Sample -} - -func (c *concreteSeries) Labels() labels.Labels { - return c.labels -} - -func (c *concreteSeries) Iterator() storage.SeriesIterator { - return newConcreteSeriersIterator(c) -} - -// concreteSeriesIterator implements storage.SeriesIterator. -type concreteSeriesIterator struct { - cur int - series *concreteSeries -} - -func newConcreteSeriersIterator(series *concreteSeries) storage.SeriesIterator { - return &concreteSeriesIterator{ - cur: -1, - series: series, - } -} - -// Seek implements storage.SeriesIterator. -func (c *concreteSeriesIterator) Seek(t int64) bool { - c.cur = sort.Search(len(c.series.samples), func(n int) bool { - return c.series.samples[n].Timestamp >= t - }) - return c.cur < len(c.series.samples) -} - -// At implements storage.SeriesIterator. -func (c *concreteSeriesIterator) At() (t int64, v float64) { - s := c.series.samples[c.cur] - return s.Timestamp, s.Value -} - -// Next implements storage.SeriesIterator. -func (c *concreteSeriesIterator) Next() bool { - c.cur++ - return c.cur < len(c.series.samples) -} - -// Err implements storage.SeriesIterator. -func (c *concreteSeriesIterator) Err() error { - return nil -} - -// validateLabelsAndMetricName validates the label names/values and metric names returned from remote read. -func validateLabelsAndMetricName(ls labels.Labels) error { - for _, l := range ls { - if l.Name == labels.MetricName && !model.IsValidMetricName(model.LabelValue(l.Value)) { - return fmt.Errorf("Invalid metric name: %v", l.Value) - } - if !model.LabelName(l.Name).IsValid() { - return fmt.Errorf("Invalid label name: %v", l.Name) - } - if !model.LabelValue(l.Value).IsValid() { - return fmt.Errorf("Invalid label value: %v", l.Value) - } - } - return nil -} - // addExternalLabels adds matchers for each external label. External labels // that already have a corresponding user-supplied matcher are skipped, as we // assume that the user explicitly wants to select a different value for them. @@ -240,12 +127,38 @@ func (q *querier) addExternalLabels(matchers []*labels.Matcher) ([]*labels.Match return matchers, el } -func removeLabels(l *labels.Labels, toDelete model.LabelSet) { - for i := 0; i < len(*l); { - if _, ok := toDelete[model.LabelName((*l)[i].Name)]; ok { - *l = (*l)[:i+copy((*l)[i:], (*l)[i+1:])] - } else { - i++ - } +func newSeriesSetFilter(ss storage.SeriesSet, toFilter model.LabelSet) storage.SeriesSet { + return &seriesSetFilter{ + SeriesSet: ss, + toFilter: toFilter, } } + +type seriesSetFilter struct { + storage.SeriesSet + toFilter model.LabelSet +} + +func (ssf seriesSetFilter) At() storage.Series { + return seriesFilter{ + Series: ssf.SeriesSet.At(), + toFilter: ssf.toFilter, + } +} + +type seriesFilter struct { + storage.Series + toFilter model.LabelSet +} + +func (sf seriesFilter) Labels() labels.Labels { + labels := sf.Series.Labels() + for i := 0; i < len(labels); { + if _, ok := sf.toFilter[model.LabelName(labels[i].Name)]; ok { + labels = labels[:i+copy(labels[i:], labels[i+1:])] + continue + } + i++ + } + return labels +} diff --git a/storage/remote/read_test.go b/storage/remote/read_test.go index 376dce3a3..8f0dc0cae 100644 --- a/storage/remote/read_test.go +++ b/storage/remote/read_test.go @@ -27,128 +27,6 @@ import ( "github.com/prometheus/prometheus/storage" ) -func TestValidateLabelsAndMetricName(t *testing.T) { - tests := []struct { - result model.Matrix - expectedErr string - shouldPass bool - }{ - { - result: model.Matrix{ - &model.SampleStream{ - Metric: model.Metric{ - "__name__": "name", - "labelName": "labelValue", - }, - }, - }, - expectedErr: "", - shouldPass: true, - }, - { - result: model.Matrix{ - &model.SampleStream{ - Metric: model.Metric{ - "__name__": "name", - "_labelName": "labelValue", - }, - }, - }, - expectedErr: "", - shouldPass: true, - }, - { - result: model.Matrix{ - &model.SampleStream{ - Metric: model.Metric{ - "__name__": "name", - "@labelName": "labelValue", - }, - }, - }, - expectedErr: "Invalid label name: @labelName", - shouldPass: false, - }, - { - result: model.Matrix{ - &model.SampleStream{ - Metric: model.Metric{ - "__name__": "name", - "123labelName": "labelValue", - }, - }, - }, - expectedErr: "Invalid label name: 123labelName", - shouldPass: false, - }, - { - result: model.Matrix{ - &model.SampleStream{ - Metric: model.Metric{ - "__name__": "name", - "": "labelValue", - }, - }, - }, - expectedErr: "Invalid label name: ", - shouldPass: false, - }, - { - result: model.Matrix{ - &model.SampleStream{ - Metric: model.Metric{ - "__name__": "name", - "labelName": model.LabelValue([]byte{0xff}), - }, - }, - }, - expectedErr: "Invalid label value: " + string([]byte{0xff}), - shouldPass: false, - }, - { - result: model.Matrix{ - &model.SampleStream{ - Metric: model.Metric{ - "__name__": "@invalid_name", - }, - }, - }, - expectedErr: "Invalid metric name: @invalid_name", - shouldPass: false, - }, - } - - for _, test := range tests { - var err error - for _, ss := range test.result { - ls := make(labels.Labels, 0, len(ss.Metric)) - for k, v := range ss.Metric { - ls = append(ls, labels.Label{ - Name: string(k), - Value: string(v), - }) - } - err = validateLabelsAndMetricName(ls) - if err != nil { - break - } - } - if test.shouldPass { - if err != nil { - t.Fatalf("Test should pass, got unexpected error: %v", err) - } - continue - } - if err != nil { - if err.Error() != test.expectedErr { - t.Fatalf("Unexpected error, got: %v, expected: %v", err, test.expectedErr) - } - } else { - t.Fatalf("Expected error, got none") - } - } -} - func mustNewLabelMatcher(mt labels.MatchType, name, val string) *labels.Matcher { m, err := labels.NewMatcher(mt, name, val) if err != nil { @@ -222,53 +100,36 @@ func TestAddExternalLabels(t *testing.T) { func TestRemoveLabels(t *testing.T) { tests := []struct { - in labels.Labels - out labels.Labels + in *prompb.QueryResult toRemove model.LabelSet + + expected *prompb.QueryResult }{ { toRemove: model.LabelSet{"foo": "bar"}, - in: labels.FromStrings("foo", "bar", "a", "b"), - out: labels.FromStrings("a", "b"), + in: &prompb.QueryResult{ + Timeseries: []*prompb.TimeSeries{ + {Labels: labelsToLabelsProto(labels.FromStrings("foo", "bar", "a", "b")), Samples: []*prompb.Sample{}}, + }, + }, + expected: &prompb.QueryResult{ + Timeseries: []*prompb.TimeSeries{ + {Labels: labelsToLabelsProto(labels.FromStrings("a", "b")), Samples: []*prompb.Sample{}}, + }, + }, }, } - for i, test := range tests { - in := test.in.Copy() - removeLabels(&in, test.toRemove) - - if !reflect.DeepEqual(in, test.out) { - t.Fatalf("%d. unexpected labels; want %v, got %v", i, test.out, in) + for i, tc := range tests { + filtered := newSeriesSetFilter(FromQueryResult(tc.in), tc.toRemove) + have, err := ToQueryResult(filtered) + if err != nil { + t.Fatal(err) } - } -} -func TestConcreteSeriesSet(t *testing.T) { - series1 := &concreteSeries{ - labels: labels.FromStrings("foo", "bar"), - samples: []*prompb.Sample{&prompb.Sample{Value: 1, Timestamp: 2}}, - } - series2 := &concreteSeries{ - labels: labels.FromStrings("foo", "baz"), - samples: []*prompb.Sample{&prompb.Sample{Value: 3, Timestamp: 4}}, - } - c := &concreteSeriesSet{ - series: []storage.Series{series1, series2}, - } - if !c.Next() { - t.Fatalf("Expected Next() to be true.") - } - if c.At() != series1 { - t.Fatalf("Unexpected series returned.") - } - if !c.Next() { - t.Fatalf("Expected Next() to be true.") - } - if c.At() != series2 { - t.Fatalf("Unexpected series returned.") - } - if c.Next() { - t.Fatalf("Expected Next() to be false.") + if !reflect.DeepEqual(have, tc.expected) { + t.Fatalf("%d. unexpected labels; want %v, got %v", i, tc.expected, have) + } } } diff --git a/web/api/v1/api.go b/web/api/v1/api.go index 81e55e524..e2e78f438 100644 --- a/web/api/v1/api.go +++ b/web/api/v1/api.go @@ -31,9 +31,11 @@ import ( "github.com/prometheus/prometheus/config" "github.com/prometheus/prometheus/pkg/labels" "github.com/prometheus/prometheus/pkg/timestamp" + "github.com/prometheus/prometheus/prompb" "github.com/prometheus/prometheus/promql" "github.com/prometheus/prometheus/retrieval" "github.com/prometheus/prometheus/storage" + "github.com/prometheus/prometheus/storage/remote" "github.com/prometheus/prometheus/util/httputil" ) @@ -161,6 +163,7 @@ func (api *API) Register(r *route.Router) { r.Get("/alertmanagers", instr("alertmanagers", api.alertmanagers)) r.Get("/status/config", instr("config", api.serveConfig)) + r.Post("/read", api.ready(prometheus.InstrumentHandler("read", http.HandlerFunc(api.remoteRead)))) } type queryData struct { @@ -451,6 +454,80 @@ func (api *API) serveConfig(r *http.Request) (interface{}, *apiError) { return cfg, nil } +func (api *API) remoteRead(w http.ResponseWriter, r *http.Request) { + req, err := remote.DecodeReadRequest(r) + if err != nil { + http.Error(w, err.Error(), http.StatusBadRequest) + return + } + + resp := prompb.ReadResponse{ + Results: make([]*prompb.QueryResult, len(req.Queries)), + } + for i, query := range req.Queries { + from, through, matchers, err := remote.FromQuery(query) + if err != nil { + http.Error(w, err.Error(), http.StatusBadRequest) + return + } + + querier, err := api.Queryable.Querier(r.Context(), from, through) + if err != nil { + http.Error(w, err.Error(), http.StatusInternalServerError) + return + } + defer querier.Close() + + // Change equality matchers which match external labels + // to a matcher that looks for an empty label, + // as that label should not be present in the storage. + externalLabels := api.config().GlobalConfig.ExternalLabels.Clone() + filteredMatchers := make([]*labels.Matcher, 0, len(matchers)) + for _, m := range matchers { + value := externalLabels[model.LabelName(m.Name)] + if m.Type == labels.MatchEqual && value == model.LabelValue(m.Value) { + matcher, err := labels.NewMatcher(labels.MatchEqual, m.Name, "") + if err != nil { + http.Error(w, err.Error(), http.StatusInternalServerError) + return + } + filteredMatchers = append(filteredMatchers, matcher) + } else { + filteredMatchers = append(filteredMatchers, m) + } + } + + resp.Results[i], err = remote.ToQueryResult(querier.Select(filteredMatchers...)) + if err != nil { + http.Error(w, err.Error(), http.StatusInternalServerError) + return + } + + // Add external labels back in. + for _, ts := range resp.Results[i].Timeseries { + globalUsed := map[string]struct{}{} + for _, l := range ts.Labels { + if _, ok := externalLabels[model.LabelName(l.Name)]; ok { + globalUsed[l.Name] = struct{}{} + } + } + for ln, lv := range externalLabels { + if _, ok := globalUsed[string(ln)]; !ok { + ts.Labels = append(ts.Labels, &prompb.Label{ + Name: string(ln), + Value: string(lv), + }) + } + } + } + } + + if err := remote.EncodeReadResponse(&resp, w); err != nil { + http.Error(w, err.Error(), http.StatusInternalServerError) + return + } +} + func respond(w http.ResponseWriter, data interface{}) { w.Header().Set("Content-Type", "application/json") w.WriteHeader(http.StatusOK) diff --git a/web/api/v1/api_test.go b/web/api/v1/api_test.go index b306f82d1..a29475c77 100644 --- a/web/api/v1/api_test.go +++ b/web/api/v1/api_test.go @@ -14,6 +14,7 @@ package v1 import ( + "bytes" "context" "encoding/json" "errors" @@ -26,14 +27,19 @@ import ( "testing" "time" + "github.com/gogo/protobuf/proto" + "github.com/golang/snappy" "github.com/prometheus/common/model" "github.com/prometheus/common/route" + "github.com/weaveworks/common/test" "github.com/prometheus/prometheus/config" "github.com/prometheus/prometheus/pkg/labels" "github.com/prometheus/prometheus/pkg/timestamp" + "github.com/prometheus/prometheus/prompb" "github.com/prometheus/prometheus/promql" "github.com/prometheus/prometheus/retrieval" + "github.com/prometheus/prometheus/storage/remote" ) type targetRetrieverFunc func() []*retrieval.Target @@ -476,6 +482,103 @@ func TestEndpoints(t *testing.T) { } } +func TestReadEndpoint(t *testing.T) { + suite, err := promql.NewTest(t, ` + load 1m + test_metric1{foo="bar",baz="qux"} 1 + `) + if err != nil { + t.Fatal(err) + } + defer suite.Close() + + if err := suite.Run(); err != nil { + t.Fatal(err) + } + + api := &API{ + Queryable: suite.Storage(), + QueryEngine: suite.QueryEngine(), + config: func() config.Config { + return config.Config{ + GlobalConfig: config.GlobalConfig{ + ExternalLabels: model.LabelSet{ + "baz": "a", + "b": "c", + "d": "e", + }, + }, + } + }, + } + + // Encode the request. + matcher1, err := labels.NewMatcher(labels.MatchEqual, "__name__", "test_metric1") + if err != nil { + t.Fatal(err) + } + matcher2, err := labels.NewMatcher(labels.MatchEqual, "d", "e") + if err != nil { + t.Fatal(err) + } + query, err := remote.ToQuery(0, 1, []*labels.Matcher{matcher1, matcher2}) + if err != nil { + t.Fatal(err) + } + req := &prompb.ReadRequest{Queries: []*prompb.Query{query}} + data, err := proto.Marshal(req) + if err != nil { + t.Fatal(err) + } + compressed := snappy.Encode(nil, data) + request, err := http.NewRequest("POST", "", bytes.NewBuffer(compressed)) + if err != nil { + t.Fatal(err) + } + recorder := httptest.NewRecorder() + api.remoteRead(recorder, request) + + // Decode the response. + compressed, err = ioutil.ReadAll(recorder.Result().Body) + if err != nil { + t.Fatal(err) + } + uncompressed, err := snappy.Decode(nil, compressed) + if err != nil { + t.Fatal(err) + } + + var resp prompb.ReadResponse + err = proto.Unmarshal(uncompressed, &resp) + if err != nil { + t.Fatal(err) + } + + if len(resp.Results) != 1 { + t.Fatalf("Expected 1 result, got %d", len(resp.Results)) + } + + result := resp.Results[0] + expected := &prompb.QueryResult{ + Timeseries: []*prompb.TimeSeries{ + { + Labels: []*prompb.Label{ + {Name: "__name__", Value: "test_metric1"}, + {Name: "baz", Value: "qux"}, + {Name: "foo", Value: "bar"}, + {Name: "b", Value: "c"}, + {Name: "d", Value: "e"}, + }, + Samples: []*prompb.Sample{{Value: 1, Timestamp: 0}}, + }, + }, + } + if !reflect.DeepEqual(result, expected) { + t.Fatalf(test.Diff(expected, result)) + t.Fatalf("Expected response \n%v\n but got \n%v\n", result, expected) + } +} + func TestRespondSuccess(t *testing.T) { s := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { respond(w, "test") From e8c264e47a14dd002ac37c0a6c5680473a1b4c00 Mon Sep 17 00:00:00 2001 From: Tom Wilkie Date: Mon, 23 Oct 2017 21:28:57 +0100 Subject: [PATCH 08/14] Add comment. --- storage/remote/write.go | 1 + 1 file changed, 1 insertion(+) diff --git a/storage/remote/write.go b/storage/remote/write.go index 514dcbe43..2687b9d83 100644 --- a/storage/remote/write.go +++ b/storage/remote/write.go @@ -38,6 +38,7 @@ func (s *Storage) Add(l labels.Labels, t int64, v float64) (uint64, error) { return 0, nil } +// AddFast implements storage.Appender. func (s *Storage) AddFast(l labels.Labels, _ uint64, t int64, v float64) error { _, err := s.Add(l, t, v) return err From 2ae04d0e79226e51893ae96af36d7aff4a0934f9 Mon Sep 17 00:00:00 2001 From: Tom Wilkie Date: Mon, 23 Oct 2017 21:39:40 +0100 Subject: [PATCH 09/14] Add license header. --- storage/remote/codec_test.go | 13 +++++++++++++ 1 file changed, 13 insertions(+) diff --git a/storage/remote/codec_test.go b/storage/remote/codec_test.go index d62e6da45..8cb9a2a95 100644 --- a/storage/remote/codec_test.go +++ b/storage/remote/codec_test.go @@ -1,3 +1,16 @@ +// Copyright 2017 The Prometheus Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + package remote import ( From b22485bef033c3ed6dc4ad011b41d8ac45eb6453 Mon Sep 17 00:00:00 2001 From: Tom Wilkie Date: Mon, 23 Oct 2017 21:59:32 +0100 Subject: [PATCH 10/14] Remove spurious test import. --- web/api/v1/api_test.go | 2 -- 1 file changed, 2 deletions(-) diff --git a/web/api/v1/api_test.go b/web/api/v1/api_test.go index a29475c77..fbda1b270 100644 --- a/web/api/v1/api_test.go +++ b/web/api/v1/api_test.go @@ -31,7 +31,6 @@ import ( "github.com/golang/snappy" "github.com/prometheus/common/model" "github.com/prometheus/common/route" - "github.com/weaveworks/common/test" "github.com/prometheus/prometheus/config" "github.com/prometheus/prometheus/pkg/labels" @@ -574,7 +573,6 @@ func TestReadEndpoint(t *testing.T) { }, } if !reflect.DeepEqual(result, expected) { - t.Fatalf(test.Diff(expected, result)) t.Fatalf("Expected response \n%v\n but got \n%v\n", result, expected) } } From 6e4d4ea402b84190f82618bec30275b7f7fb9229 Mon Sep 17 00:00:00 2001 From: Tom Wilkie Date: Tue, 24 Oct 2017 15:20:05 +0100 Subject: [PATCH 11/14] Initialise some counters in remote storage API. --- storage/remote/queue_manager.go | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/storage/remote/queue_manager.go b/storage/remote/queue_manager.go index e47e421fb..5e30fd0f6 100644 --- a/storage/remote/queue_manager.go +++ b/storage/remote/queue_manager.go @@ -183,6 +183,12 @@ func NewQueueManager(logger log.Logger, cfg config.QueueConfig, externalLabels m numShards.WithLabelValues(t.queueName).Set(float64(t.numShards)) queueCapacity.WithLabelValues(t.queueName).Set(float64(t.cfg.Capacity)) + // Initialise counter labels to zero. + sentBatchDuration.WithLabelValues(t.queueName) + succeededSamplesTotal.WithLabelValues(t.queueName) + failedSamplesTotal.WithLabelValues(t.queueName) + droppedSamplesTotal.WithLabelValues(t.queueName) + return t } From 746752b946b43c4e3f2a94822619afccbb5cbab6 Mon Sep 17 00:00:00 2001 From: Tom Wilkie Date: Thu, 26 Oct 2017 11:44:49 +0100 Subject: [PATCH 12/14] Merge external labels in order. --- storage/remote/codec.go | 3 +++ web/api/v1/api.go | 56 ++++++++++++++++++++++++++++++----------- web/api/v1/api_test.go | 4 +-- 3 files changed, 46 insertions(+), 17 deletions(-) diff --git a/storage/remote/codec.go b/storage/remote/codec.go index 8de5ed182..4ed9662c0 100644 --- a/storage/remote/codec.go +++ b/storage/remote/codec.go @@ -321,6 +321,9 @@ func MetricToLabelProtos(metric model.Metric) []*prompb.Label { Value: string(v), }) } + sort.Slice(labels, func(i int, j int) bool { + return labels[i].Name < labels[j].Name + }) return labels } diff --git a/web/api/v1/api.go b/web/api/v1/api.go index e2e78f438..2289738be 100644 --- a/web/api/v1/api.go +++ b/web/api/v1/api.go @@ -21,6 +21,7 @@ import ( "math" "net/http" "net/url" + "sort" "strconv" "time" @@ -503,22 +504,20 @@ func (api *API) remoteRead(w http.ResponseWriter, r *http.Request) { return } - // Add external labels back in. + // Add external labels back in, in sorted order. + sortedExternalLabels := make([]*prompb.Label, 0, len(externalLabels)) + for name, value := range externalLabels { + sortedExternalLabels = append(sortedExternalLabels, &prompb.Label{ + Name: string(name), + Value: string(value), + }) + } + sort.Slice(sortedExternalLabels, func(i, j int) bool { + return sortedExternalLabels[i].Name < sortedExternalLabels[j].Name + }) + for _, ts := range resp.Results[i].Timeseries { - globalUsed := map[string]struct{}{} - for _, l := range ts.Labels { - if _, ok := externalLabels[model.LabelName(l.Name)]; ok { - globalUsed[l.Name] = struct{}{} - } - } - for ln, lv := range externalLabels { - if _, ok := globalUsed[string(ln)]; !ok { - ts.Labels = append(ts.Labels, &prompb.Label{ - Name: string(ln), - Value: string(lv), - }) - } - } + ts.Labels = mergeLabels(ts.Labels, sortedExternalLabels) } } @@ -528,6 +527,33 @@ func (api *API) remoteRead(w http.ResponseWriter, r *http.Request) { } } +// mergeLabels merges two sets of sorted proto labels, preferring those in +// primary to those in secondary when there is an overlap. +func mergeLabels(primary, secondary []*prompb.Label) []*prompb.Label { + result := make([]*prompb.Label, 0, len(primary)+len(secondary)) + i, j := 0, 0 + for i < len(primary) && j < len(secondary) { + if primary[i].Name < secondary[j].Name { + result = append(result, primary[i]) + i++ + } else if primary[i].Name > secondary[j].Name { + result = append(result, secondary[j]) + j++ + } else { + result = append(result, primary[i]) + i++ + j++ + } + } + for ; i < len(primary); i++ { + result = append(result, primary[i]) + } + for ; j < len(secondary); j++ { + result = append(result, secondary[j]) + } + return result +} + func respond(w http.ResponseWriter, data interface{}) { w.Header().Set("Content-Type", "application/json") w.WriteHeader(http.StatusOK) diff --git a/web/api/v1/api_test.go b/web/api/v1/api_test.go index fbda1b270..3415e0ac3 100644 --- a/web/api/v1/api_test.go +++ b/web/api/v1/api_test.go @@ -563,10 +563,10 @@ func TestReadEndpoint(t *testing.T) { { Labels: []*prompb.Label{ {Name: "__name__", Value: "test_metric1"}, - {Name: "baz", Value: "qux"}, - {Name: "foo", Value: "bar"}, {Name: "b", Value: "c"}, + {Name: "baz", Value: "qux"}, {Name: "d", Value: "e"}, + {Name: "foo", Value: "bar"}, }, Samples: []*prompb.Sample{{Value: 1, Timestamp: 0}}, }, From 9c3c98e8de5bd1ef5e9e3aa76d4510a767f02a25 Mon Sep 17 00:00:00 2001 From: Tom Wilkie Date: Thu, 26 Oct 2017 13:43:48 +0100 Subject: [PATCH 13/14] Revert "Port 'Don't disable HTTP keep-alives for remote storage connections.' to 2.0 (see #3173)" This reverts commit 0997191b182bc40a1e050856084fe9b8dc46f523. --- storage/remote/client.go | 2 +- util/httputil/client.go | 8 +------- 2 files changed, 2 insertions(+), 8 deletions(-) diff --git a/storage/remote/client.go b/storage/remote/client.go index 462e7c5f7..82e217690 100644 --- a/storage/remote/client.go +++ b/storage/remote/client.go @@ -54,7 +54,7 @@ type ClientConfig struct { // NewClient creates a new Client. func NewClient(index int, conf *ClientConfig) (*Client, error) { - httpClient, err := httputil.NewClientFromConfigAndOptions(conf.HTTPClientConfig, "remote_storage", false) + httpClient, err := httputil.NewClientFromConfig(conf.HTTPClientConfig, "remote_storage") if err != nil { return nil, err } diff --git a/util/httputil/client.go b/util/httputil/client.go index bed76b243..e36e35361 100644 --- a/util/httputil/client.go +++ b/util/httputil/client.go @@ -34,12 +34,6 @@ func newClient(rt http.RoundTripper) *http.Client { // NewClientFromConfig returns a new HTTP client configured for the // given config.HTTPClientConfig. The name is used as go-conntrack metric label. func NewClientFromConfig(cfg config.HTTPClientConfig, name string) (*http.Client, error) { - return NewClientFromConfigAndOptions(cfg, name, true) -} - -// NewClientFromConfigAndOptions returns a new HTTP client configured for the -// given config.HTTPClientConfig. The name is used as go-conntrack metric label. -func NewClientFromConfigAndOptions(cfg config.HTTPClientConfig, name string, disableKeepAlives bool) (*http.Client, error) { tlsConfig, err := NewTLSConfig(cfg.TLSConfig) if err != nil { return nil, err @@ -49,7 +43,7 @@ func NewClientFromConfigAndOptions(cfg config.HTTPClientConfig, name string, dis var rt http.RoundTripper = &http.Transport{ Proxy: http.ProxyURL(cfg.ProxyURL.URL), MaxIdleConns: 20000, - DisableKeepAlives: disableKeepAlives, + DisableKeepAlives: false, TLSClientConfig: tlsConfig, DisableCompression: true, // 5 minutes is typically above the maximum sane scrape interval. So we can From 1af3ef431da322cef79985b827cd40d8793563ad Mon Sep 17 00:00:00 2001 From: Tom Wilkie Date: Thu, 26 Oct 2017 13:50:39 +0100 Subject: [PATCH 14/14] s/TestRemoveLabels/TestSeriesSetFilter/ --- storage/remote/read_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/storage/remote/read_test.go b/storage/remote/read_test.go index 8f0dc0cae..2fce33189 100644 --- a/storage/remote/read_test.go +++ b/storage/remote/read_test.go @@ -98,7 +98,7 @@ func TestAddExternalLabels(t *testing.T) { } } -func TestRemoveLabels(t *testing.T) { +func TestSeriesSetFilter(t *testing.T) { tests := []struct { in *prompb.QueryResult toRemove model.LabelSet