Update example remote adapters for change in proto location.

pull/2850/head
Tom Wilkie 2017-07-19 16:39:02 +01:00
parent ec999ff397
commit cf105f9d57
3 changed files with 38 additions and 38 deletions

View File

@ -22,7 +22,7 @@ import (
"github.com/golang/snappy" "github.com/golang/snappy"
"github.com/prometheus/common/model" "github.com/prometheus/common/model"
"github.com/prometheus/prometheus/storage/remote" "github.com/prometheus/prometheus/prompb"
) )
func main() { func main() {
@ -39,7 +39,7 @@ func main() {
return return
} }
var req remote.WriteRequest var req prompb.WriteRequest
if err := proto.Unmarshal(reqBuf, &req); err != nil { if err := proto.Unmarshal(reqBuf, &req); err != nil {
http.Error(w, err.Error(), http.StatusBadRequest) http.Error(w, err.Error(), http.StatusBadRequest)
return return
@ -53,7 +53,7 @@ func main() {
fmt.Println(m) fmt.Println(m)
for _, s := range ts.Samples { for _, s := range ts.Samples {
fmt.Printf(" %f %d\n", s.Value, s.TimestampMs) fmt.Printf(" %f %d\n", s.Value, s.Timestamp)
} }
} }
}) })

View File

@ -22,7 +22,7 @@ import (
"github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/common/log" "github.com/prometheus/common/log"
"github.com/prometheus/common/model" "github.com/prometheus/common/model"
"github.com/prometheus/prometheus/storage/remote" "github.com/prometheus/prometheus/prompb"
influx "github.com/influxdata/influxdb/client/v2" influx "github.com/influxdata/influxdb/client/v2"
) )
@ -101,8 +101,8 @@ func (c *Client) Write(samples model.Samples) error {
return c.client.Write(bps) return c.client.Write(bps)
} }
func (c *Client) Read(req *remote.ReadRequest) (*remote.ReadResponse, error) { func (c *Client) Read(req *prompb.ReadRequest) (*prompb.ReadResponse, error) {
labelsToSeries := map[string]*remote.TimeSeries{} labelsToSeries := map[string]*prompb.TimeSeries{}
for _, q := range req.Queries { for _, q := range req.Queries {
command, err := c.buildCommand(q) command, err := c.buildCommand(q)
if err != nil { if err != nil {
@ -123,9 +123,9 @@ func (c *Client) Read(req *remote.ReadRequest) (*remote.ReadResponse, error) {
} }
} }
resp := remote.ReadResponse{ resp := prompb.ReadResponse{
Results: []*remote.QueryResult{ Results: []*prompb.QueryResult{
{Timeseries: make([]*remote.TimeSeries, 0, len(labelsToSeries))}, {Timeseries: make([]*prompb.TimeSeries, 0, len(labelsToSeries))},
}, },
} }
for _, ts := range labelsToSeries { for _, ts := range labelsToSeries {
@ -134,7 +134,7 @@ func (c *Client) Read(req *remote.ReadRequest) (*remote.ReadResponse, error) {
return &resp, nil return &resp, nil
} }
func (c *Client) buildCommand(q *remote.Query) (string, error) { func (c *Client) buildCommand(q *prompb.Query) (string, error) {
matchers := make([]string, 0, len(q.Matchers)) matchers := make([]string, 0, len(q.Matchers))
// If we don't find a metric name matcher, query all metrics // If we don't find a metric name matcher, query all metrics
// (InfluxDB measurements) by default. // (InfluxDB measurements) by default.
@ -142,9 +142,9 @@ func (c *Client) buildCommand(q *remote.Query) (string, error) {
for _, m := range q.Matchers { for _, m := range q.Matchers {
if m.Name == model.MetricNameLabel { if m.Name == model.MetricNameLabel {
switch m.Type { switch m.Type {
case remote.MatchType_EQUAL: case prompb.LabelMatcher_EQ:
from = fmt.Sprintf("FROM %q.%q", c.retentionPolicy, m.Value) from = fmt.Sprintf("FROM %q.%q", c.retentionPolicy, m.Value)
case remote.MatchType_REGEX_MATCH: case prompb.LabelMatcher_RE:
from = fmt.Sprintf("FROM %q./^%s$/", c.retentionPolicy, escapeSlashes(m.Value)) from = fmt.Sprintf("FROM %q./^%s$/", c.retentionPolicy, escapeSlashes(m.Value))
default: default:
// TODO: Figure out how to support these efficiently. // TODO: Figure out how to support these efficiently.
@ -154,13 +154,13 @@ func (c *Client) buildCommand(q *remote.Query) (string, error) {
} }
switch m.Type { switch m.Type {
case remote.MatchType_EQUAL: case prompb.LabelMatcher_EQ:
matchers = append(matchers, fmt.Sprintf("%q = '%s'", m.Name, escapeSingleQuotes(m.Value))) matchers = append(matchers, fmt.Sprintf("%q = '%s'", m.Name, escapeSingleQuotes(m.Value)))
case remote.MatchType_NOT_EQUAL: case prompb.LabelMatcher_NEQ:
matchers = append(matchers, fmt.Sprintf("%q != '%s'", m.Name, escapeSingleQuotes(m.Value))) matchers = append(matchers, fmt.Sprintf("%q != '%s'", m.Name, escapeSingleQuotes(m.Value)))
case remote.MatchType_REGEX_MATCH: case prompb.LabelMatcher_RE:
matchers = append(matchers, fmt.Sprintf("%q =~ /^%s$/", m.Name, escapeSlashes(m.Value))) matchers = append(matchers, fmt.Sprintf("%q =~ /^%s$/", m.Name, escapeSlashes(m.Value)))
case remote.MatchType_REGEX_NO_MATCH: case prompb.LabelMatcher_NRE:
matchers = append(matchers, fmt.Sprintf("%q !~ /^%s$/", m.Name, escapeSlashes(m.Value))) matchers = append(matchers, fmt.Sprintf("%q !~ /^%s$/", m.Name, escapeSlashes(m.Value)))
default: default:
return "", fmt.Errorf("unknown match type %v", m.Type) return "", fmt.Errorf("unknown match type %v", m.Type)
@ -180,13 +180,13 @@ func escapeSlashes(str string) string {
return strings.Replace(str, `/`, `\/`, -1) return strings.Replace(str, `/`, `\/`, -1)
} }
func mergeResult(labelsToSeries map[string]*remote.TimeSeries, results []influx.Result) error { func mergeResult(labelsToSeries map[string]*prompb.TimeSeries, results []influx.Result) error {
for _, r := range results { for _, r := range results {
for _, s := range r.Series { for _, s := range r.Series {
k := concatLabels(s.Tags) k := concatLabels(s.Tags)
ts, ok := labelsToSeries[k] ts, ok := labelsToSeries[k]
if !ok { if !ok {
ts = &remote.TimeSeries{ ts = &prompb.TimeSeries{
Labels: tagsToLabelPairs(s.Name, s.Tags), Labels: tagsToLabelPairs(s.Name, s.Tags),
} }
labelsToSeries[k] = ts labelsToSeries[k] = ts
@ -214,8 +214,8 @@ func concatLabels(labels map[string]string) string {
return strings.Join(pairs, separator) return strings.Join(pairs, separator)
} }
func tagsToLabelPairs(name string, tags map[string]string) []*remote.LabelPair { func tagsToLabelPairs(name string, tags map[string]string) []*prompb.Label {
pairs := make([]*remote.LabelPair, 0, len(tags)) pairs := make([]*prompb.Label, 0, len(tags))
for k, v := range tags { for k, v := range tags {
if v == "" { if v == "" {
// If we select metrics with different sets of labels names, // If we select metrics with different sets of labels names,
@ -226,20 +226,20 @@ func tagsToLabelPairs(name string, tags map[string]string) []*remote.LabelPair {
// to make the result correct. // to make the result correct.
continue continue
} }
pairs = append(pairs, &remote.LabelPair{ pairs = append(pairs, &prompb.Label{
Name: k, Name: k,
Value: v, Value: v,
}) })
} }
pairs = append(pairs, &remote.LabelPair{ pairs = append(pairs, &prompb.Label{
Name: model.MetricNameLabel, Name: model.MetricNameLabel,
Value: name, Value: name,
}) })
return pairs return pairs
} }
func valuesToSamples(values [][]interface{}) ([]*remote.Sample, error) { func valuesToSamples(values [][]interface{}) ([]*prompb.Sample, error) {
samples := make([]*remote.Sample, 0, len(values)) samples := make([]*prompb.Sample, 0, len(values))
for _, v := range values { for _, v := range values {
if len(v) != 2 { if len(v) != 2 {
return nil, fmt.Errorf("bad sample tuple length, expected [<timestamp>, <value>], got %v", v) return nil, fmt.Errorf("bad sample tuple length, expected [<timestamp>, <value>], got %v", v)
@ -265,9 +265,9 @@ func valuesToSamples(values [][]interface{}) ([]*remote.Sample, error) {
return nil, fmt.Errorf("unable to convert sample value to float64: %v", err) return nil, fmt.Errorf("unable to convert sample value to float64: %v", err)
} }
samples = append(samples, &remote.Sample{ samples = append(samples, &prompb.Sample{
TimestampMs: timestamp, Timestamp: timestamp,
Value: value, Value: value,
}) })
} }
return samples, nil return samples, nil
@ -275,14 +275,14 @@ func valuesToSamples(values [][]interface{}) ([]*remote.Sample, error) {
// mergeSamples merges two lists of sample pairs and removes duplicate // mergeSamples merges two lists of sample pairs and removes duplicate
// timestamps. It assumes that both lists are sorted by timestamp. // timestamps. It assumes that both lists are sorted by timestamp.
func mergeSamples(a, b []*remote.Sample) []*remote.Sample { func mergeSamples(a, b []*prompb.Sample) []*prompb.Sample {
result := make([]*remote.Sample, 0, len(a)+len(b)) result := make([]*prompb.Sample, 0, len(a)+len(b))
i, j := 0, 0 i, j := 0, 0
for i < len(a) && j < len(b) { for i < len(a) && j < len(b) {
if a[i].TimestampMs < b[j].TimestampMs { if a[i].Timestamp < b[j].Timestamp {
result = append(result, a[i]) result = append(result, a[i])
i++ i++
} else if a[i].TimestampMs > b[j].TimestampMs { } else if a[i].Timestamp > b[j].Timestamp {
result = append(result, b[j]) result = append(result, b[j])
j++ j++
} else { } else {

View File

@ -36,7 +36,7 @@ import (
"github.com/prometheus/prometheus/documentation/examples/remote_storage/remote_storage_adapter/graphite" "github.com/prometheus/prometheus/documentation/examples/remote_storage/remote_storage_adapter/graphite"
"github.com/prometheus/prometheus/documentation/examples/remote_storage/remote_storage_adapter/influxdb" "github.com/prometheus/prometheus/documentation/examples/remote_storage/remote_storage_adapter/influxdb"
"github.com/prometheus/prometheus/documentation/examples/remote_storage/remote_storage_adapter/opentsdb" "github.com/prometheus/prometheus/documentation/examples/remote_storage/remote_storage_adapter/opentsdb"
"github.com/prometheus/prometheus/storage/remote" "github.com/prometheus/prometheus/prompb"
) )
type config struct { type config struct {
@ -146,7 +146,7 @@ type writer interface {
} }
type reader interface { type reader interface {
Read(req *remote.ReadRequest) (*remote.ReadResponse, error) Read(req *prompb.ReadRequest) (*prompb.ReadResponse, error)
Name() string Name() string
} }
@ -196,7 +196,7 @@ func serve(addr string, writers []writer, readers []reader) error {
return return
} }
var req remote.WriteRequest var req prompb.WriteRequest
if err := proto.Unmarshal(reqBuf, &req); err != nil { if err := proto.Unmarshal(reqBuf, &req); err != nil {
http.Error(w, err.Error(), http.StatusBadRequest) http.Error(w, err.Error(), http.StatusBadRequest)
return return
@ -229,7 +229,7 @@ func serve(addr string, writers []writer, readers []reader) error {
return return
} }
var req remote.ReadRequest var req prompb.ReadRequest
if err := proto.Unmarshal(reqBuf, &req); err != nil { if err := proto.Unmarshal(reqBuf, &req); err != nil {
http.Error(w, err.Error(), http.StatusBadRequest) http.Error(w, err.Error(), http.StatusBadRequest)
return return
@ -242,7 +242,7 @@ func serve(addr string, writers []writer, readers []reader) error {
} }
reader := readers[0] reader := readers[0]
var resp *remote.ReadResponse var resp *prompb.ReadResponse
resp, err = reader.Read(&req) resp, err = reader.Read(&req)
if err != nil { if err != nil {
log.With("query", req).With("storage", reader.Name()).With("err", err).Warnf("Error executing query") log.With("query", req).With("storage", reader.Name()).With("err", err).Warnf("Error executing query")
@ -269,7 +269,7 @@ func serve(addr string, writers []writer, readers []reader) error {
return http.ListenAndServe(addr, nil) return http.ListenAndServe(addr, nil)
} }
func protoToSamples(req *remote.WriteRequest) model.Samples { func protoToSamples(req *prompb.WriteRequest) model.Samples {
var samples model.Samples var samples model.Samples
for _, ts := range req.Timeseries { for _, ts := range req.Timeseries {
metric := make(model.Metric, len(ts.Labels)) metric := make(model.Metric, len(ts.Labels))
@ -281,7 +281,7 @@ func protoToSamples(req *remote.WriteRequest) model.Samples {
samples = append(samples, &model.Sample{ samples = append(samples, &model.Sample{
Metric: metric, Metric: metric,
Value: model.SampleValue(s.Value), Value: model.SampleValue(s.Value),
Timestamp: model.Time(s.TimestampMs), Timestamp: model.Time(s.Timestamp),
}) })
} }
} }