From b16371595daa6eddff192b63d3f570d2a32bb858 Mon Sep 17 00:00:00 2001 From: Julius Volz Date: Tue, 31 Jan 2017 17:44:22 +0100 Subject: [PATCH] Add standalone remote storage bridge example In preparation for removing specific remote storage implementations, this offers an example of how to achieve the same in a separate process. Rather than having three separate bridges for OpenTSDB, InfluxDB, and Graphite, I decided to support all in one binary. For now, this is in the example documenation directory, but perhaps we will want to make a first-class project / repository out of it. --- .../{ => example_receiver}/README.md | 2 +- .../{ => example_receiver}/server.go | 0 .../remote_storage_bridge/README.md | 35 +++ .../remote_storage_bridge/main.go | 231 ++++++++++++++++++ 4 files changed, 267 insertions(+), 1 deletion(-) rename documentation/examples/remote_storage/{ => example_receiver}/README.md (94%) rename documentation/examples/remote_storage/{ => example_receiver}/server.go (100%) create mode 100644 documentation/examples/remote_storage/remote_storage_bridge/README.md create mode 100644 documentation/examples/remote_storage/remote_storage_bridge/main.go diff --git a/documentation/examples/remote_storage/README.md b/documentation/examples/remote_storage/example_receiver/README.md similarity index 94% rename from documentation/examples/remote_storage/README.md rename to documentation/examples/remote_storage/example_receiver/README.md index 483bb22dc..3a0be8c0b 100644 --- a/documentation/examples/remote_storage/README.md +++ b/documentation/examples/remote_storage/example_receiver/README.md @@ -7,7 +7,7 @@ To use it: ``` go build -./remote_storage +./example_receiver ``` ...and then add the following to your `prometheus.yml`: diff --git a/documentation/examples/remote_storage/server.go b/documentation/examples/remote_storage/example_receiver/server.go similarity index 100% rename from documentation/examples/remote_storage/server.go rename to documentation/examples/remote_storage/example_receiver/server.go diff --git a/documentation/examples/remote_storage/remote_storage_bridge/README.md b/documentation/examples/remote_storage/remote_storage_bridge/README.md new file mode 100644 index 000000000..ad194c716 --- /dev/null +++ b/documentation/examples/remote_storage/remote_storage_bridge/README.md @@ -0,0 +1,35 @@ +# Remote storage bridge + +This is a bridge that receives samples in Prometheus's remote storage +format and forwards them to Graphite, InfluxDB, or OpenTSDB. It is meant +as a replacement for the built-in specific remote storage implementations +that have been removed from Prometheus. + +## Building + +``` +go build +``` + +## Running + +Example: + +``` +./remote_storage_bridge -graphite-address=localhost:8080 -opentsdb-url=http://localhost:8081/ +``` + +To show all flags: + +``` +./remote_storage_bridge -h +``` + +## Configuring Prometheus + +To configure Prometheus to send samples to this bridge, add the following to your `prometheus.yml`: + +```yaml +remote_write: + url: "http://localhost:9201/receive" +``` \ No newline at end of file diff --git a/documentation/examples/remote_storage/remote_storage_bridge/main.go b/documentation/examples/remote_storage/remote_storage_bridge/main.go new file mode 100644 index 000000000..b5b3f12ab --- /dev/null +++ b/documentation/examples/remote_storage/remote_storage_bridge/main.go @@ -0,0 +1,231 @@ +// Copyright 2017 The Prometheus Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +// The main package for the Prometheus server executable. +package main + +import ( + "flag" + "io/ioutil" + "net/http" + _ "net/http/pprof" + "net/url" + "os" + "sync" + "time" + + "github.com/gogo/protobuf/proto" + "github.com/golang/snappy" + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/common/log" + "github.com/prometheus/common/model" + "github.com/prometheus/prometheus/storage/remote" + "github.com/prometheus/prometheus/storage/remote/graphite" + "github.com/prometheus/prometheus/storage/remote/influxdb" + "github.com/prometheus/prometheus/storage/remote/opentsdb" + + influx "github.com/influxdb/influxdb/client" +) + +type config struct { + graphiteAddress string + graphiteTransport string + graphitePrefix string + opentsdbURL string + influxdbURL string + influxdbRetentionPolicy string + influxdbUsername string + influxdbDatabase string + influxdbPassword string + remoteTimeout time.Duration + listenAddr string + telemetryPath string +} + +var ( + receivedSamples = prometheus.NewCounter( + prometheus.CounterOpts{ + Name: "received_samples_total", + Help: "Total number of received samples.", + }, + ) + sentSamples = prometheus.NewCounterVec( + prometheus.CounterOpts{ + Name: "sent_samples_total", + Help: "Total number of processed samples sent to remote storage.", + }, + []string{"remote"}, + ) + failedSamples = prometheus.NewCounterVec( + prometheus.CounterOpts{ + Name: "failed_samples_total", + Help: "Total number of processed samples which failed on send to remote storage.", + }, + []string{"remote"}, + ) + sentBatchDuration = prometheus.NewHistogramVec( + prometheus.HistogramOpts{ + Name: "sent_batch_duration_seconds", + Help: "Duration of sample batch send calls to the remote storage.", + Buckets: prometheus.DefBuckets, + }, + []string{"remote"}, + ) +) + +func init() { + prometheus.MustRegister(receivedSamples) + prometheus.MustRegister(sentSamples) + prometheus.MustRegister(failedSamples) + prometheus.MustRegister(sentBatchDuration) +} + +func main() { + cfg := parseFlags() + http.Handle(cfg.telemetryPath, prometheus.Handler()) + + clients := buildClients(cfg) + serve(cfg.listenAddr, clients) +} + +func parseFlags() *config { + cfg := &config{ + influxdbPassword: os.Getenv("INFLUXDB_PW"), + } + + flag.StringVar(&cfg.graphiteAddress, "graphite-address", "", + "The host:port of the Graphite server to send samples to. None, if empty.", + ) + flag.StringVar(&cfg.graphiteTransport, "graphite-transport", "tcp", + "Transport protocol to use to communicate with Graphite. 'tcp', if empty.", + ) + flag.StringVar(&cfg.graphitePrefix, "graphite-prefix", "", + "The prefix to prepend to all metrics exported to Graphite. None, if empty.", + ) + flag.StringVar(&cfg.opentsdbURL, "opentsdb-url", "", + "The URL of the remote OpenTSDB server to send samples to. None, if empty.", + ) + flag.StringVar(&cfg.influxdbURL, "influxdb-url", "", + "The URL of the remote InfluxDB server to send samples to. None, if empty.", + ) + flag.StringVar(&cfg.influxdbRetentionPolicy, "influxdb.retention-policy", "default", + "The InfluxDB retention policy to use.", + ) + flag.StringVar(&cfg.influxdbUsername, "influxdb.username", "", + "The username to use when sending samples to InfluxDB. The corresponding password must be provided via the INFLUXDB_PW environment variable.", + ) + flag.StringVar(&cfg.influxdbDatabase, "influxdb.database", "prometheus", + "The name of the database to use for storing samples in InfluxDB.", + ) + flag.DurationVar(&cfg.remoteTimeout, "send-timeout", 30*time.Second, + "The timeout to use when sending samples to the remote storage.", + ) + flag.StringVar(&cfg.listenAddr, "web.listen-address", ":9201", "Address to listen on for web endpoints.") + flag.StringVar(&cfg.telemetryPath, "web.telemetry-path", "/metrics", "Address to listen on for web endpoints.") + + flag.Parse() + + return cfg +} + +func buildClients(cfg *config) []remote.StorageClient { + var clients []remote.StorageClient + if cfg.graphiteAddress != "" { + c := graphite.NewClient( + cfg.graphiteAddress, cfg.graphiteTransport, + cfg.remoteTimeout, cfg.graphitePrefix) + clients = append(clients, c) + } + if cfg.opentsdbURL != "" { + c := opentsdb.NewClient(cfg.opentsdbURL, cfg.remoteTimeout) + clients = append(clients, c) + } + if cfg.influxdbURL != "" { + url, err := url.Parse(cfg.influxdbURL) + if err != nil { + log.Fatalf("Failed to parse InfluxDB URL %q: %v", cfg.influxdbURL, err) + } + conf := influx.Config{ + URL: *url, + Username: cfg.influxdbUsername, + Password: cfg.influxdbPassword, + Timeout: cfg.remoteTimeout, + } + c := influxdb.NewClient(conf, cfg.influxdbDatabase, cfg.influxdbRetentionPolicy) + prometheus.MustRegister(c) + clients = append(clients, c) + } + return clients +} + +func serve(addr string, clients []remote.StorageClient) error { + http.HandleFunc("/receive", func(w http.ResponseWriter, r *http.Request) { + reqBuf, err := ioutil.ReadAll(snappy.NewReader(r.Body)) + if err != nil { + http.Error(w, err.Error(), http.StatusBadRequest) + return + } + + var req remote.WriteRequest + if err := proto.Unmarshal(reqBuf, &req); err != nil { + http.Error(w, err.Error(), http.StatusBadRequest) + return + } + + samples := protoToSamples(&req) + receivedSamples.Add(float64(len(samples))) + + var wg sync.WaitGroup + for _, c := range clients { + wg.Add(1) + go func(rc remote.StorageClient) { + sendSamples(rc, samples) + wg.Done() + }(c) + } + wg.Wait() + }) + + return http.ListenAndServe(addr, nil) +} + +func protoToSamples(req *remote.WriteRequest) model.Samples { + var samples model.Samples + for _, ts := range req.Timeseries { + metric := make(model.Metric, len(ts.Labels)) + for _, l := range ts.Labels { + metric[model.LabelName(l.Name)] = model.LabelValue(l.Value) + } + + for _, s := range ts.Samples { + samples = append(samples, &model.Sample{ + Metric: metric, + Value: model.SampleValue(s.Value), + Timestamp: model.Time(s.TimestampMs), + }) + } + } + return samples +} + +func sendSamples(c remote.StorageClient, samples model.Samples) { + begin := time.Now() + err := c.Store(samples) + duration := time.Since(begin).Seconds() + if err != nil { + log.Warnf("Error sending %d samples to remote storage %q: %v", len(samples), c.Name(), err) + failedSamples.WithLabelValues(c.Name()).Add(float64(len(samples))) + } + sentSamples.WithLabelValues(c.Name()).Add(float64(len(samples))) + sentBatchDuration.WithLabelValues(c.Name()).Observe(duration) +}