Browse Source

Fix InfluxDB write support to work with InfluxDB 0.9.x.

Because the InfluxDB client library currently pulls in multiple MBs of
unnecessary dependencies, I have modified and cut up the vendored
version to only pull in the few pieces that are actually needed.

On InfluxDB's side, this dependency issue is tracked in:

https://github.com/influxdb/influxdb/issues/3447

Hopefully, it will be resolved soon.

If a password is needed for InfluxDB, it may be supplied via the
INFLUXDB_PW environment variable.
pull/1087/head
Julius Volz 9 years ago
parent
commit
eeb1da36ac
  1. 34
      cmd/prometheus/config.go
  2. 150
      storage/remote/influxdb/client.go
  3. 51
      storage/remote/influxdb/client_test.go
  4. 18
      storage/remote/remote.go
  5. 180
      vendor/github.com/influxdb/influxdb/client/influxdb.go
  6. 1392
      vendor/github.com/influxdb/influxdb/tsdb/points.go
  7. 12
      vendor/vendor.json

34
cmd/prometheus/config.go

@ -48,6 +48,7 @@ var cfg = struct {
remote remote.Options remote remote.Options
prometheusURL string prometheusURL string
influxdbURL string
}{} }{}
func init() { func init() {
@ -167,13 +168,17 @@ func init() {
"The URL of the remote OpenTSDB server to send samples to. None, if empty.", "The URL of the remote OpenTSDB server to send samples to. None, if empty.",
) )
cfg.fs.StringVar( cfg.fs.StringVar(
&cfg.remote.InfluxdbURL, "storage.remote.influxdb-url", "", &cfg.influxdbURL, "storage.remote.influxdb-url", "",
"The URL of the remote InfluxDB server to send samples to. None, if empty.", "The URL of the remote InfluxDB server to send samples to. None, if empty.",
) )
cfg.fs.StringVar( cfg.fs.StringVar(
&cfg.remote.InfluxdbRetentionPolicy, "storage.remote.influxdb.retention-policy", "default", &cfg.remote.InfluxdbRetentionPolicy, "storage.remote.influxdb.retention-policy", "default",
"The InfluxDB retention policy to use.", "The InfluxDB retention policy to use.",
) )
cfg.fs.StringVar(
&cfg.remote.InfluxdbUsername, "storage.remote.influxdb.username", "",
"The username to use when sending samples to InfluxDB.",
)
cfg.fs.StringVar( cfg.fs.StringVar(
&cfg.remote.InfluxdbDatabase, "storage.remote.influxdb.database", "prometheus", &cfg.remote.InfluxdbDatabase, "storage.remote.influxdb.database", "prometheus",
"The name of the database to use for storing samples in InfluxDB.", "The name of the database to use for storing samples in InfluxDB.",
@ -221,6 +226,20 @@ func parse(args []string) error {
return err return err
} }
if err := parsePrometheusURL(); err != nil {
return err
}
if err := parseInfluxdbURL(); err != nil {
return err
}
cfg.remote.InfluxdbPassword = os.Getenv("INFLUXDB_PW")
return nil
}
func parsePrometheusURL() error {
if cfg.prometheusURL == "" { if cfg.prometheusURL == "" {
hostname, err := os.Hostname() hostname, err := os.Hostname()
if err != nil { if err != nil {
@ -244,7 +263,20 @@ func parse(args []string) error {
ppref = "/" + ppref ppref = "/" + ppref
} }
cfg.web.ExternalURL.Path = ppref cfg.web.ExternalURL.Path = ppref
return nil
}
func parseInfluxdbURL() error {
if cfg.influxdbURL == "" {
return nil
}
url, err := url.Parse(cfg.influxdbURL)
if err != nil {
return err
}
cfg.remote.InfluxdbURL = url
return nil return nil
} }

150
storage/remote/influxdb/client.go

@ -14,148 +14,96 @@
package influxdb package influxdb
import ( import (
"bytes"
"encoding/json"
"fmt"
"io/ioutil"
"math" "math"
"net/http"
"net/url"
"time"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/common/model" "github.com/prometheus/common/model"
"github.com/prometheus/log" "github.com/prometheus/log"
"github.com/prometheus/prometheus/util/httputil" influx "github.com/influxdb/influxdb/client"
)
const (
writeEndpoint = "/write"
contentTypeJSON = "application/json"
) )
// Client allows sending batches of Prometheus samples to InfluxDB. // Client allows sending batches of Prometheus samples to InfluxDB.
type Client struct { type Client struct {
url string client *influx.Client
httpClient *http.Client
retentionPolicy string
database string database string
retentionPolicy string
ignoredSamples prometheus.Counter
} }
// NewClient creates a new Client. // NewClient creates a new Client.
func NewClient(url string, timeout time.Duration, database, retentionPolicy string) *Client { func NewClient(conf influx.Config, db string, rp string) *Client {
return &Client{ c, err := influx.NewClient(conf)
url: url, // Currently influx.NewClient() *should* never return an error.
httpClient: httputil.NewDeadlineClient(timeout, nil), if err != nil {
retentionPolicy: retentionPolicy, log.Fatal(err)
database: database,
} }
}
// StoreSamplesRequest is used for building a JSON request for storing samples return &Client{
// in InfluxDB. client: c,
type StoreSamplesRequest struct { database: db,
Database string `json:"database"` retentionPolicy: rp,
RetentionPolicy string `json:"retentionPolicy"` ignoredSamples: prometheus.NewCounter(
Points []point `json:"points"` prometheus.CounterOpts{
} Name: "prometheus_influxdb_ignored_samples_total",
Help: "The total number of samples not sent to InfluxDB due to unsupported float values (Inf, -Inf, NaN).",
// point represents a single InfluxDB measurement. },
type point struct { ),
Timestamp int64 `json:"timestamp"` }
Precision string `json:"precision"`
Name model.LabelValue `json:"name"`
Tags model.LabelSet `json:"tags"`
Fields fields `json:"fields"`
}
// fields represents the fields/columns sent to InfluxDB for a given measurement.
type fields struct {
Value model.SampleValue `json:"value"`
} }
// tagsFromMetric extracts InfluxDB tags from a Prometheus metric. // tagsFromMetric extracts InfluxDB tags from a Prometheus metric.
func tagsFromMetric(m model.Metric) model.LabelSet { func tagsFromMetric(m model.Metric) map[string]string {
tags := make(model.LabelSet, len(m)-1) tags := make(map[string]string, len(m)-1)
for l, v := range m { for l, v := range m {
if l == model.MetricNameLabel { if l != model.MetricNameLabel {
continue tags[string(l)] = string(v)
} }
tags[l] = v
} }
return tags return tags
} }
// Store sends a batch of samples to InfluxDB via its HTTP API. // Store sends a batch of samples to InfluxDB via its HTTP API.
func (c *Client) Store(samples model.Samples) error { func (c *Client) Store(samples model.Samples) error {
points := make([]point, 0, len(samples)) points := make([]influx.Point, 0, len(samples))
for _, s := range samples { for _, s := range samples {
v := float64(s.Value) v := float64(s.Value)
if math.IsNaN(v) || math.IsInf(v, 0) { if math.IsNaN(v) || math.IsInf(v, 0) {
// TODO(julius): figure out if it's possible to insert special float log.Debugf("cannot send value %f to InfluxDB, skipping sample %#v", v, s)
// values into InfluxDB somehow. c.ignoredSamples.Inc()
log.Warnf("cannot send value %f to InfluxDB, skipping sample %#v", v, s)
continue continue
} }
metric := s.Metric[model.MetricNameLabel] points = append(points, influx.Point{
points = append(points, point{ Measurement: string(s.Metric[model.MetricNameLabel]),
Timestamp: s.Timestamp.UnixNano(), Tags: tagsFromMetric(s.Metric),
Precision: "n", Time: s.Timestamp.Time(),
Name: metric, Precision: "ms",
Tags: tagsFromMetric(s.Metric), Fields: map[string]interface{}{
Fields: fields{ "value": v,
Value: s.Value,
}, },
}) })
} }
u, err := url.Parse(c.url) bps := influx.BatchPoints{
if err != nil { Points: points,
return err
}
u.Path = writeEndpoint
req := StoreSamplesRequest{
Database: c.database, Database: c.database,
RetentionPolicy: c.retentionPolicy, RetentionPolicy: c.retentionPolicy,
Points: points,
}
buf, err := json.Marshal(req)
if err != nil {
return err
}
resp, err := c.httpClient.Post(
u.String(),
contentTypeJSON,
bytes.NewBuffer(buf),
)
if err != nil {
return err
}
defer resp.Body.Close()
// API returns status code 200 for successful writes.
// http://influxdb.com/docs/v0.9/concepts/reading_and_writing_data.html#response
if resp.StatusCode == http.StatusOK {
return nil
} }
_, err := c.client.Write(bps)
// API returns error details in the response content in JSON. return err
buf, err = ioutil.ReadAll(resp.Body)
if err != nil {
return err
}
var r map[string]string
if err := json.Unmarshal(buf, &r); err != nil {
return err
}
return fmt.Errorf("failed to write samples into InfluxDB. Error: %s", r["error"])
} }
// Name identifies the client as an InfluxDB client. // Name identifies the client as an InfluxDB client.
func (c Client) Name() string { func (c Client) Name() string {
return "influxdb" return "influxdb"
} }
// Describe implements prometheus.Collector.
func (c *Client) Describe(ch chan<- *prometheus.Desc) {
ch <- c.ignoredSamples.Desc()
}
// Collect implements prometheus.Collector.
func (c *Client) Collect(ch chan<- prometheus.Metric) {
ch <- c.ignoredSamples
}

51
storage/remote/influxdb/client_test.go

@ -18,9 +18,12 @@ import (
"math" "math"
"net/http" "net/http"
"net/http/httptest" "net/http/httptest"
"net/url"
"testing" "testing"
"time" "time"
influx "github.com/influxdb/influxdb/client"
"github.com/prometheus/common/model" "github.com/prometheus/common/model"
) )
@ -44,43 +47,63 @@ func TestClient(t *testing.T) {
}, },
{ {
Metric: model.Metric{ Metric: model.Metric{
model.MetricNameLabel: "special_float_value", model.MetricNameLabel: "nan_value",
}, },
Timestamp: model.Time(123456789123), Timestamp: model.Time(123456789123),
Value: model.SampleValue(math.NaN()), Value: model.SampleValue(math.NaN()),
}, },
{
Metric: model.Metric{
model.MetricNameLabel: "pos_inf_value",
},
Timestamp: model.Time(123456789123),
Value: model.SampleValue(math.Inf(1)),
},
{
Metric: model.Metric{
model.MetricNameLabel: "neg_inf_value",
},
Timestamp: model.Time(123456789123),
Value: model.SampleValue(math.Inf(-1)),
},
} }
expectedJSON := `{"database":"prometheus","retentionPolicy":"default","points":[{"timestamp":123456789123000000,"precision":"n","name":"testmetric","tags":{"test_label":"test_label_value1"},"fields":{"value":"1.23"}},{"timestamp":123456789123000000,"precision":"n","name":"testmetric","tags":{"test_label":"test_label_value2"},"fields":{"value":"5.1234"}}]}` expectedBody := `testmetric,test_label=test_label_value1 value=1.23 123456789123000000
testmetric,test_label=test_label_value2 value=5.1234 123456789123000000
`
server := httptest.NewServer(http.HandlerFunc( server := httptest.NewServer(http.HandlerFunc(
func(w http.ResponseWriter, r *http.Request) { func(w http.ResponseWriter, r *http.Request) {
if r.Method != "POST" { if r.Method != "POST" {
t.Fatalf("Unexpected method; expected POST, got %s", r.Method) t.Fatalf("Unexpected method; expected POST, got %s", r.Method)
} }
if r.URL.Path != writeEndpoint { if r.URL.Path != "/write" {
t.Fatalf("Unexpected path; expected %s, got %s", writeEndpoint, r.URL.Path) t.Fatalf("Unexpected path; expected %s, got %s", "/write", r.URL.Path)
}
ct := r.Header["Content-Type"]
if len(ct) != 1 {
t.Fatalf("Unexpected number of 'Content-Type' headers; got %d, want 1", len(ct))
}
if ct[0] != contentTypeJSON {
t.Fatalf("Unexpected 'Content-type'; expected %s, got %s", contentTypeJSON, ct[0])
} }
b, err := ioutil.ReadAll(r.Body) b, err := ioutil.ReadAll(r.Body)
if err != nil { if err != nil {
t.Fatalf("Error reading body: %s", err) t.Fatalf("Error reading body: %s", err)
} }
if string(b) != expectedJSON { if string(b) != expectedBody {
t.Fatalf("Unexpected request body; expected:\n\n%s\n\ngot:\n\n%s", expectedJSON, string(b)) t.Fatalf("Unexpected request body; expected:\n\n%s\n\ngot:\n\n%s", expectedBody, string(b))
} }
}, },
)) ))
defer server.Close() defer server.Close()
c := NewClient(server.URL, time.Minute, "prometheus", "default") serverURL, err := url.Parse(server.URL)
if err != nil {
t.Fatalf("Unable to parse server URL %s: %s", server.URL, err)
}
conf := influx.Config{
URL: *serverURL,
Username: "testuser",
Password: "testpass",
Timeout: time.Minute,
}
c := NewClient(conf, "test_db", "default")
if err := c.Store(samples); err != nil { if err := c.Store(samples); err != nil {
t.Fatalf("Error sending samples: %s", err) t.Fatalf("Error sending samples: %s", err)

18
storage/remote/remote.go

@ -14,12 +14,15 @@
package remote package remote
import ( import (
"net/url"
"sync" "sync"
"time" "time"
"github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/common/model" "github.com/prometheus/common/model"
influx "github.com/influxdb/influxdb/client"
"github.com/prometheus/prometheus/config" "github.com/prometheus/prometheus/config"
"github.com/prometheus/prometheus/storage/remote/influxdb" "github.com/prometheus/prometheus/storage/remote/influxdb"
"github.com/prometheus/prometheus/storage/remote/opentsdb" "github.com/prometheus/prometheus/storage/remote/opentsdb"
@ -49,8 +52,15 @@ func New(o *Options) *Storage {
c := opentsdb.NewClient(o.OpentsdbURL, o.StorageTimeout) c := opentsdb.NewClient(o.OpentsdbURL, o.StorageTimeout)
s.queues = append(s.queues, NewStorageQueueManager(c, 100*1024)) s.queues = append(s.queues, NewStorageQueueManager(c, 100*1024))
} }
if o.InfluxdbURL != "" { if o.InfluxdbURL != nil {
c := influxdb.NewClient(o.InfluxdbURL, o.StorageTimeout, o.InfluxdbDatabase, o.InfluxdbRetentionPolicy) conf := influx.Config{
URL: *o.InfluxdbURL,
Username: o.InfluxdbUsername,
Password: o.InfluxdbPassword,
Timeout: o.StorageTimeout,
}
c := influxdb.NewClient(conf, o.InfluxdbDatabase, o.InfluxdbRetentionPolicy)
prometheus.MustRegister(c)
s.queues = append(s.queues, NewStorageQueueManager(c, 100*1024)) s.queues = append(s.queues, NewStorageQueueManager(c, 100*1024))
} }
if len(s.queues) == 0 { if len(s.queues) == 0 {
@ -62,8 +72,10 @@ func New(o *Options) *Storage {
// Options contains configuration parameters for a remote storage. // Options contains configuration parameters for a remote storage.
type Options struct { type Options struct {
StorageTimeout time.Duration StorageTimeout time.Duration
InfluxdbURL string InfluxdbURL *url.URL
InfluxdbRetentionPolicy string InfluxdbRetentionPolicy string
InfluxdbUsername string
InfluxdbPassword string
InfluxdbDatabase string InfluxdbDatabase string
OpentsdbURL string OpentsdbURL string
} }

180
vendor/github.com/influxdb/influxdb/client/influxdb.go generated vendored

@ -0,0 +1,180 @@
package client
import (
"bytes"
"fmt"
"io/ioutil"
"net/http"
"net/url"
"time"
"github.com/influxdb/influxdb/tsdb"
)
const (
// DefaultTimeout is the default connection timeout used to connect to an InfluxDB instance
DefaultTimeout = 0
)
// Config is used to specify what server to connect to.
// URL: The URL of the server connecting to.
// Username/Password are optional. They will be passed via basic auth if provided.
// UserAgent: If not provided, will default "InfluxDBClient",
// Timeout: If not provided, will default to 0 (no timeout)
type Config struct {
URL url.URL
Username string
Password string
UserAgent string
Timeout time.Duration
Precision string
}
// NewConfig will create a config to be used in connecting to the client
func NewConfig() Config {
return Config{
Timeout: DefaultTimeout,
}
}
// Client is used to make calls to the server.
type Client struct {
url url.URL
username string
password string
httpClient *http.Client
userAgent string
precision string
}
const (
ConsistencyOne = "one"
ConsistencyAll = "all"
ConsistencyQuorum = "quorum"
ConsistencyAny = "any"
)
// NewClient will instantiate and return a connected client to issue commands to the server.
func NewClient(c Config) (*Client, error) {
client := Client{
url: c.URL,
username: c.Username,
password: c.Password,
httpClient: &http.Client{Timeout: c.Timeout},
userAgent: c.UserAgent,
precision: c.Precision,
}
if client.userAgent == "" {
client.userAgent = "InfluxDBClient"
}
return &client, nil
}
// Write takes BatchPoints and allows for writing of multiple points with defaults
// If successful, error is nil and Response is nil
// If an error occurs, Response may contain additional information if populated.
func (c *Client) Write(bp BatchPoints) (*Response, error) {
u := c.url
u.Path = "write"
var b bytes.Buffer
for _, p := range bp.Points {
if p.Raw != "" {
if _, err := b.WriteString(p.Raw); err != nil {
return nil, err
}
} else {
for k, v := range bp.Tags {
if p.Tags == nil {
p.Tags = make(map[string]string, len(bp.Tags))
}
p.Tags[k] = v
}
if _, err := b.WriteString(p.MarshalString()); err != nil {
return nil, err
}
}
if err := b.WriteByte('\n'); err != nil {
return nil, err
}
}
req, err := http.NewRequest("POST", u.String(), &b)
if err != nil {
return nil, err
}
req.Header.Set("Content-Type", "")
req.Header.Set("User-Agent", c.userAgent)
if c.username != "" {
req.SetBasicAuth(c.username, c.password)
}
params := req.URL.Query()
params.Set("db", bp.Database)
params.Set("rp", bp.RetentionPolicy)
params.Set("precision", bp.Precision)
params.Set("consistency", bp.WriteConsistency)
req.URL.RawQuery = params.Encode()
resp, err := c.httpClient.Do(req)
if err != nil {
return nil, err
}
defer resp.Body.Close()
var response Response
body, err := ioutil.ReadAll(resp.Body)
if err != nil {
return nil, err
}
if resp.StatusCode != http.StatusNoContent && resp.StatusCode != http.StatusOK {
var err = fmt.Errorf(string(body))
response.Err = err
return &response, err
}
return nil, nil
}
// Structs
// Response represents a list of statement results.
type Response struct {
Err error
}
// Point defines the fields that will be written to the database
// Measurement, Time, and Fields are required
// Precision can be specified if the time is in epoch format (integer).
// Valid values for Precision are n, u, ms, s, m, and h
type Point struct {
Measurement string
Tags map[string]string
Time time.Time
Fields map[string]interface{}
Precision string
Raw string
}
func (p *Point) MarshalString() string {
return tsdb.NewPoint(p.Measurement, p.Tags, p.Fields, p.Time).String()
}
// BatchPoints is used to send batched data in a single write.
// Database and Points are required
// If no retention policy is specified, it will use the databases default retention policy.
// If tags are specified, they will be "merged" with all points. If a point already has that tag, it is ignored.
// If time is specified, it will be applied to any point with an empty time.
// Precision can be specified if the time is in epoch format (integer).
// Valid values for Precision are n, u, ms, s, m, and h
type BatchPoints struct {
Points []Point `json:"points,omitempty"`
Database string `json:"database,omitempty"`
RetentionPolicy string `json:"retentionPolicy,omitempty"`
Tags map[string]string `json:"tags,omitempty"`
Time time.Time `json:"time,omitempty"`
Precision string `json:"precision,omitempty"`
WriteConsistency string `json:"-"`
}

1392
vendor/github.com/influxdb/influxdb/tsdb/points.go generated vendored

File diff suppressed because it is too large Load Diff

12
vendor/vendor.json vendored

@ -32,6 +32,16 @@
"revision": "34f98f7bdf2eec7517e3aac44691566963152721", "revision": "34f98f7bdf2eec7517e3aac44691566963152721",
"revisionTime": "2015-09-08T11:01:49-04:00" "revisionTime": "2015-09-08T11:01:49-04:00"
}, },
{
"path": "github.com/influxdb/influxdb/client",
"revision": "291aaeb9485b43b16875c238482b2f7d0a22a13b",
"revisionTime": "2015-09-16T14:41:53+02:00"
},
{
"path": "github.com/influxdb/influxdb/tsdb",
"revision": "291aaeb9485b43b16875c238482b2f7d0a22a13b",
"revisionTime": "2015-09-16T14:41:53+02:00"
},
{ {
"path": "github.com/julienschmidt/httprouter", "path": "github.com/julienschmidt/httprouter",
"revision": "109e267447e95ad1bb48b758e40dd7453eb7b039", "revision": "109e267447e95ad1bb48b758e40dd7453eb7b039",
@ -163,4 +173,4 @@
"revisionTime": "2015-06-24T11:29:02+01:00" "revisionTime": "2015-06-24T11:29:02+01:00"
} }
] ]
} }

Loading…
Cancel
Save