mirror of https://github.com/k3s-io/k3s
Merge pull request #42930 from KarolKraskiewicz/influxdb-clientv2
Automatic merge from submit-queue update influxdb dependency to v1.1.1 and change client to v2 **What this PR does / why we need it**: 1. it updates version of influxdb libraries used by tests to v1.1.1 to match version used by grafana 2. it switches influxdb client to v2 to address the fact that [v1 is being depricated](https://github.com/influxdata/influxdb/tree/v1.1.1/client#description) **Which issue this PR fixes** *(optional, in `fixes #<issue number>(, fixes #<issue_number>, ...)` format, will close that issue when PR gets merged)*: fixes # **Special notes for your reviewer**: cc @piosz 1. [vendor/BUILD](https://github.com/KarolKraskiewicz/kubernetes/blob/master/vendor/BUILD) didn't get regenerated after executing `./hack/godep-save.sh` so I left previous version. Not sure how to trigger regeneration of this file. 2. `tests/e2e/monitoring.go` seem to be passing without changes, even after changing version of the client. **Release note**: ```release-note ```pull/6/head
commit
9dae6a734a
|
@ -1622,18 +1622,23 @@
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
"ImportPath": "github.com/influxdata/influxdb/client",
|
"ImportPath": "github.com/influxdata/influxdb/client",
|
||||||
"Comment": "v0.12.2",
|
"Comment": "v1.1.1",
|
||||||
"Rev": "383332daed5595926c235f250b11433f67229c35"
|
"Rev": "e47cf1f2e83a02443d7115c54f838be8ee959644"
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"ImportPath": "github.com/influxdata/influxdb/client/v2",
|
||||||
|
"Comment": "v1.1.1",
|
||||||
|
"Rev": "e47cf1f2e83a02443d7115c54f838be8ee959644"
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
"ImportPath": "github.com/influxdata/influxdb/models",
|
"ImportPath": "github.com/influxdata/influxdb/models",
|
||||||
"Comment": "v0.12.2",
|
"Comment": "v1.1.1",
|
||||||
"Rev": "383332daed5595926c235f250b11433f67229c35"
|
"Rev": "e47cf1f2e83a02443d7115c54f838be8ee959644"
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
"ImportPath": "github.com/influxdata/influxdb/pkg/escape",
|
"ImportPath": "github.com/influxdata/influxdb/pkg/escape",
|
||||||
"Comment": "v0.12.2",
|
"Comment": "v1.1.1",
|
||||||
"Rev": "383332daed5595926c235f250b11433f67229c35"
|
"Rev": "e47cf1f2e83a02443d7115c54f838be8ee959644"
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
"ImportPath": "github.com/jmespath/go-jmespath",
|
"ImportPath": "github.com/jmespath/go-jmespath",
|
||||||
|
|
|
@ -57978,6 +57978,34 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
|
||||||
================================================================================
|
================================================================================
|
||||||
|
|
||||||
|
|
||||||
|
================================================================================
|
||||||
|
= vendor/github.com/influxdata/influxdb/client/v2 licensed under: =
|
||||||
|
|
||||||
|
The MIT License (MIT)
|
||||||
|
|
||||||
|
Copyright (c) 2013-2016 Errplane Inc.
|
||||||
|
|
||||||
|
Permission is hereby granted, free of charge, to any person obtaining a copy of
|
||||||
|
this software and associated documentation files (the "Software"), to deal in
|
||||||
|
the Software without restriction, including without limitation the rights to
|
||||||
|
use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of
|
||||||
|
the Software, and to permit persons to whom the Software is furnished to do so,
|
||||||
|
subject to the following conditions:
|
||||||
|
|
||||||
|
The above copyright notice and this permission notice shall be included in all
|
||||||
|
copies or substantial portions of the Software.
|
||||||
|
|
||||||
|
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
|
||||||
|
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS
|
||||||
|
FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR
|
||||||
|
COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER
|
||||||
|
IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
|
||||||
|
CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
|
||||||
|
|
||||||
|
= vendor/github.com/influxdata/influxdb/LICENSE ba8146ad9cc2a128209983265136e06a -
|
||||||
|
================================================================================
|
||||||
|
|
||||||
|
|
||||||
================================================================================
|
================================================================================
|
||||||
= vendor/github.com/influxdata/influxdb/models licensed under: =
|
= vendor/github.com/influxdata/influxdb/models licensed under: =
|
||||||
|
|
||||||
|
|
|
@ -166,7 +166,7 @@ go_library(
|
||||||
"//vendor:github.com/ghodss/yaml",
|
"//vendor:github.com/ghodss/yaml",
|
||||||
"//vendor:github.com/golang/glog",
|
"//vendor:github.com/golang/glog",
|
||||||
"//vendor:github.com/google/cadvisor/info/v1",
|
"//vendor:github.com/google/cadvisor/info/v1",
|
||||||
"//vendor:github.com/influxdata/influxdb/client",
|
"//vendor:github.com/influxdata/influxdb/client/v2",
|
||||||
"//vendor:github.com/onsi/ginkgo",
|
"//vendor:github.com/onsi/ginkgo",
|
||||||
"//vendor:github.com/onsi/ginkgo/config",
|
"//vendor:github.com/onsi/ginkgo/config",
|
||||||
"//vendor:github.com/onsi/ginkgo/reporters",
|
"//vendor:github.com/onsi/ginkgo/reporters",
|
||||||
|
|
|
@ -23,7 +23,7 @@ import (
|
||||||
"fmt"
|
"fmt"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
influxdb "github.com/influxdata/influxdb/client"
|
influxdb "github.com/influxdata/influxdb/client/v2"
|
||||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||||
"k8s.io/apimachinery/pkg/labels"
|
"k8s.io/apimachinery/pkg/labels"
|
||||||
"k8s.io/apimachinery/pkg/util/wait"
|
"k8s.io/apimachinery/pkg/util/wait"
|
||||||
|
|
|
@ -4790,12 +4790,25 @@ go_library(
|
||||||
deps = ["//vendor:github.com/influxdata/influxdb/models"],
|
deps = ["//vendor:github.com/influxdata/influxdb/models"],
|
||||||
)
|
)
|
||||||
|
|
||||||
|
go_library(
|
||||||
|
name = "github.com/influxdata/influxdb/client/v2",
|
||||||
|
srcs = [
|
||||||
|
"github.com/influxdata/influxdb/client/v2/client.go",
|
||||||
|
"github.com/influxdata/influxdb/client/v2/udp.go",
|
||||||
|
],
|
||||||
|
tags = ["automanaged"],
|
||||||
|
deps = ["//vendor:github.com/influxdata/influxdb/models"],
|
||||||
|
)
|
||||||
|
|
||||||
go_library(
|
go_library(
|
||||||
name = "github.com/influxdata/influxdb/models",
|
name = "github.com/influxdata/influxdb/models",
|
||||||
srcs = [
|
srcs = [
|
||||||
"github.com/influxdata/influxdb/models/consistency.go",
|
"github.com/influxdata/influxdb/models/consistency.go",
|
||||||
|
"github.com/influxdata/influxdb/models/inline_fnv.go",
|
||||||
|
"github.com/influxdata/influxdb/models/inline_strconv_parse.go",
|
||||||
"github.com/influxdata/influxdb/models/points.go",
|
"github.com/influxdata/influxdb/models/points.go",
|
||||||
"github.com/influxdata/influxdb/models/rows.go",
|
"github.com/influxdata/influxdb/models/rows.go",
|
||||||
|
"github.com/influxdata/influxdb/models/statistic.go",
|
||||||
"github.com/influxdata/influxdb/models/time.go",
|
"github.com/influxdata/influxdb/models/time.go",
|
||||||
],
|
],
|
||||||
tags = ["automanaged"],
|
tags = ["automanaged"],
|
||||||
|
|
|
@ -1,27 +1,23 @@
|
||||||
# List
|
# List
|
||||||
- bootstrap 3.3.5 [MIT LICENSE](https://github.com/twbs/bootstrap/blob/master/LICENSE)
|
- bootstrap 3.3.5 [MIT LICENSE](https://github.com/twbs/bootstrap/blob/master/LICENSE)
|
||||||
- collectd.org [ISC LICENSE](https://github.com/collectd/go-collectd/blob/master/LICENSE)
|
- collectd.org [ISC LICENSE](https://github.com/collectd/go-collectd/blob/master/LICENSE)
|
||||||
- github.com/armon/go-metrics [MIT LICENSE](https://github.com/armon/go-metrics/blob/master/LICENSE)
|
|
||||||
- github.com/BurntSushi/toml [WTFPL LICENSE](https://github.com/BurntSushi/toml/blob/master/COPYING)
|
- github.com/BurntSushi/toml [WTFPL LICENSE](https://github.com/BurntSushi/toml/blob/master/COPYING)
|
||||||
- github.com/bmizerany/pat [MIT LICENSE](https://github.com/bmizerany/pat#license)
|
- github.com/bmizerany/pat [MIT LICENSE](https://github.com/bmizerany/pat#license)
|
||||||
- github.com/boltdb/bolt [MIT LICENSE](https://github.com/boltdb/bolt/blob/master/LICENSE)
|
- github.com/boltdb/bolt [MIT LICENSE](https://github.com/boltdb/bolt/blob/master/LICENSE)
|
||||||
|
- github.com/davecgh/go-spew/spew [ISC LICENSE](https://github.com/davecgh/go-spew/blob/master/LICENSE)
|
||||||
|
- github.com/dgrijalva/jwt-go [MIT LICENSE](https://github.com/dgrijalva/jwt-go/blob/master/LICENSE)
|
||||||
- github.com/dgryski/go-bits [MIT LICENSE](https://github.com/dgryski/go-bits/blob/master/LICENSE)
|
- github.com/dgryski/go-bits [MIT LICENSE](https://github.com/dgryski/go-bits/blob/master/LICENSE)
|
||||||
- github.com/dgryski/go-bitstream [MIT LICENSE](https://github.com/dgryski/go-bitstream/blob/master/LICENSE)
|
- github.com/dgryski/go-bitstream [MIT LICENSE](https://github.com/dgryski/go-bitstream/blob/master/LICENSE)
|
||||||
- github.com/gogo/protobuf/proto [BSD LICENSE](https://github.com/gogo/protobuf/blob/master/LICENSE)
|
- github.com/gogo/protobuf/proto [BSD LICENSE](https://github.com/gogo/protobuf/blob/master/LICENSE)
|
||||||
- github.com/davecgh/go-spew/spew [ISC LICENSE](https://github.com/davecgh/go-spew/blob/master/LICENSE)
|
|
||||||
- github.com/golang/snappy [BSD LICENSE](https://github.com/golang/snappy/blob/master/LICENSE)
|
- github.com/golang/snappy [BSD LICENSE](https://github.com/golang/snappy/blob/master/LICENSE)
|
||||||
- github.com/hashicorp/go-msgpack [BSD LICENSE](https://github.com/hashicorp/go-msgpack/blob/master/LICENSE)
|
|
||||||
- github.com/hashicorp/raft [MPL LICENSE](https://github.com/hashicorp/raft/blob/master/LICENSE)
|
|
||||||
- github.com/hashicorp/raft-boltdb [MOZILLA PUBLIC LICENSE](https://github.com/hashicorp/raft-boltdb/blob/master/LICENSE)
|
|
||||||
- github.com/influxdata/usage-client [MIT LICENSE](https://github.com/influxdata/usage-client/blob/master/LICENSE.txt)
|
- github.com/influxdata/usage-client [MIT LICENSE](https://github.com/influxdata/usage-client/blob/master/LICENSE.txt)
|
||||||
- github.com/jwilder/encoding [MIT LICENSE](https://github.com/jwilder/encoding/blob/master/LICENSE)
|
- github.com/jwilder/encoding [MIT LICENSE](https://github.com/jwilder/encoding/blob/master/LICENSE)
|
||||||
- github.com/kimor79/gollectd [BSD LICENSE](https://github.com/kimor79/gollectd/blob/master/LICENSE)
|
- github.com/kimor79/gollectd [BSD LICENSE](https://github.com/kimor79/gollectd/blob/master/LICENSE)
|
||||||
- github.com/paulbellamy/ratecounter [MIT LICENSE](https://github.com/paulbellamy/ratecounter/blob/master/LICENSE)
|
- github.com/paulbellamy/ratecounter [MIT LICENSE](https://github.com/paulbellamy/ratecounter/blob/master/LICENSE)
|
||||||
- github.com/peterh/liner [MIT LICENSE](https://github.com/peterh/liner/blob/master/COPYING)
|
- github.com/peterh/liner [MIT LICENSE](https://github.com/peterh/liner/blob/master/COPYING)
|
||||||
- github.com/rakyll/statik [APACHE LICENSE](https://github.com/rakyll/statik/blob/master/LICENSE)
|
- github.com/rakyll/statik [APACHE LICENSE](https://github.com/rakyll/statik/blob/master/LICENSE)
|
||||||
|
- github.com/retailnext/hllpp [BSD LICENSE](https://github.com/retailnext/hllpp/blob/master/LICENSE)
|
||||||
- glyphicons [LICENSE](http://glyphicons.com/license/)
|
- glyphicons [LICENSE](http://glyphicons.com/license/)
|
||||||
- golang.org/x/crypto [BSD LICENSE](https://github.com/golang/crypto/blob/master/LICENSE)
|
- golang.org/x/crypto [BSD LICENSE](https://github.com/golang/crypto/blob/master/LICENSE)
|
||||||
- golang.org/x/tools [BSD LICENSE](https://github.com/golang/tools/blob/master/LICENSE)
|
|
||||||
- gopkg.in/fatih/pool.v2 [MIT LICENSE](https://github.com/fatih/pool/blob/v2.0.0/LICENSE)
|
|
||||||
- jquery 2.1.4 [MIT LICENSE](https://github.com/jquery/jquery/blob/master/LICENSE.txt)
|
- jquery 2.1.4 [MIT LICENSE](https://github.com/jquery/jquery/blob/master/LICENSE.txt)
|
||||||
- react 0.13.3 [BSD LICENSE](https://github.com/facebook/react/blob/master/LICENSE)
|
- react 0.13.3 [BSD LICENSE](https://github.com/facebook/react/blob/master/LICENSE)
|
||||||
|
|
|
@ -26,7 +26,7 @@ Though not necessary for experimentation, you may want to create a new user
|
||||||
and authenticate the connection to your database.
|
and authenticate the connection to your database.
|
||||||
|
|
||||||
For more information please check out the
|
For more information please check out the
|
||||||
[Admin Docs](https://docs.influxdata.com/influxdb/v0.10/administration).
|
[Admin Docs](https://docs.influxdata.com/influxdb/latest/administration/).
|
||||||
|
|
||||||
For the impatient, you can create a new admin user _bubba_ by firing off the
|
For the impatient, you can create a new admin user _bubba_ by firing off the
|
||||||
[InfluxDB CLI](https://github.com/influxdata/influxdb/blob/master/cmd/influx/main.go).
|
[InfluxDB CLI](https://github.com/influxdata/influxdb/blob/master/cmd/influx/main.go).
|
||||||
|
@ -49,10 +49,8 @@ the configuration below.
|
||||||
package main
|
package main
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"net/url"
|
|
||||||
"fmt"
|
|
||||||
"log"
|
"log"
|
||||||
"os"
|
"time"
|
||||||
|
|
||||||
"github.com/influxdata/influxdb/client/v2"
|
"github.com/influxdata/influxdb/client/v2"
|
||||||
)
|
)
|
||||||
|
@ -70,17 +68,17 @@ func main() {
|
||||||
Username: username,
|
Username: username,
|
||||||
Password: password,
|
Password: password,
|
||||||
})
|
})
|
||||||
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Fatalln("Error: ", err)
|
log.Fatalln("Error: ", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Create a new point batch
|
// Create a new point batch
|
||||||
bp, err := client.NewBatchPoints(client.BatchPointsConfig{
|
bp, err := client.NewBatchPoints(client.BatchPointsConfig{
|
||||||
Database: MyDB,
|
Database: MyDB,
|
||||||
Precision: "s",
|
Precision: "s",
|
||||||
})
|
})
|
||||||
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Fatalln("Error: ", err)
|
log.Fatalln("Error: ", err)
|
||||||
}
|
}
|
||||||
|
@ -93,11 +91,11 @@ func main() {
|
||||||
"user": 46.6,
|
"user": 46.6,
|
||||||
}
|
}
|
||||||
pt, err := client.NewPoint("cpu_usage", tags, fields, time.Now())
|
pt, err := client.NewPoint("cpu_usage", tags, fields, time.Now())
|
||||||
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Fatalln("Error: ", err)
|
log.Fatalln("Error: ", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
bp.AddPoint(pt)
|
bp.AddPoint(pt)
|
||||||
|
|
||||||
// Write the batch
|
// Write the batch
|
||||||
|
@ -257,6 +255,28 @@ func WriteUDP() {
|
||||||
}
|
}
|
||||||
```
|
```
|
||||||
|
|
||||||
|
### Point Splitting
|
||||||
|
|
||||||
|
The UDP client now supports splitting single points that exceed the configured
|
||||||
|
payload size. The logic for processing each point is listed here, starting with
|
||||||
|
an empty payload.
|
||||||
|
|
||||||
|
1. If adding the point to the current (non-empty) payload would exceed the
|
||||||
|
configured size, send the current payload. Otherwise, add it to the current
|
||||||
|
payload.
|
||||||
|
1. If the point is smaller than the configured size, add it to the payload.
|
||||||
|
1. If the point has no timestamp, just try to send the entire point as a single
|
||||||
|
UDP payload, and process the next point.
|
||||||
|
1. Since the point has a timestamp, re-use the existing measurement name,
|
||||||
|
tagset, and timestamp and create multiple new points by splitting up the
|
||||||
|
fields. The per-point length will be kept close to the configured size,
|
||||||
|
staying under it if possible. This does mean that one large field, maybe a
|
||||||
|
long string, could be sent as a larger-than-configured payload.
|
||||||
|
|
||||||
|
The above logic attempts to respect configured payload sizes, but not sacrifice
|
||||||
|
any data integrity. Points without a timestamp can't be split, as that may
|
||||||
|
cause fields to have differing timestamps when processed by the server.
|
||||||
|
|
||||||
## Go Docs
|
## Go Docs
|
||||||
|
|
||||||
Please refer to
|
Please refer to
|
||||||
|
|
|
@ -181,7 +181,7 @@ func (c *Client) Query(q Query) (*Response, error) {
|
||||||
}
|
}
|
||||||
u.RawQuery = values.Encode()
|
u.RawQuery = values.Encode()
|
||||||
|
|
||||||
req, err := http.NewRequest("GET", u.String(), nil)
|
req, err := http.NewRequest("POST", u.String(), nil)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
@ -387,22 +387,31 @@ func (c *Client) Ping() (time.Duration, string, error) {
|
||||||
|
|
||||||
// Structs
|
// Structs
|
||||||
|
|
||||||
|
// Message represents a user message.
|
||||||
|
type Message struct {
|
||||||
|
Level string `json:"level,omitempty"`
|
||||||
|
Text string `json:"text,omitempty"`
|
||||||
|
}
|
||||||
|
|
||||||
// Result represents a resultset returned from a single statement.
|
// Result represents a resultset returned from a single statement.
|
||||||
type Result struct {
|
type Result struct {
|
||||||
Series []models.Row
|
Series []models.Row
|
||||||
Err error
|
Messages []*Message
|
||||||
|
Err error
|
||||||
}
|
}
|
||||||
|
|
||||||
// MarshalJSON encodes the result into JSON.
|
// MarshalJSON encodes the result into JSON.
|
||||||
func (r *Result) MarshalJSON() ([]byte, error) {
|
func (r *Result) MarshalJSON() ([]byte, error) {
|
||||||
// Define a struct that outputs "error" as a string.
|
// Define a struct that outputs "error" as a string.
|
||||||
var o struct {
|
var o struct {
|
||||||
Series []models.Row `json:"series,omitempty"`
|
Series []models.Row `json:"series,omitempty"`
|
||||||
Err string `json:"error,omitempty"`
|
Messages []*Message `json:"messages,omitempty"`
|
||||||
|
Err string `json:"error,omitempty"`
|
||||||
}
|
}
|
||||||
|
|
||||||
// Copy fields to output struct.
|
// Copy fields to output struct.
|
||||||
o.Series = r.Series
|
o.Series = r.Series
|
||||||
|
o.Messages = r.Messages
|
||||||
if r.Err != nil {
|
if r.Err != nil {
|
||||||
o.Err = r.Err.Error()
|
o.Err = r.Err.Error()
|
||||||
}
|
}
|
||||||
|
@ -413,8 +422,9 @@ func (r *Result) MarshalJSON() ([]byte, error) {
|
||||||
// UnmarshalJSON decodes the data into the Result struct
|
// UnmarshalJSON decodes the data into the Result struct
|
||||||
func (r *Result) UnmarshalJSON(b []byte) error {
|
func (r *Result) UnmarshalJSON(b []byte) error {
|
||||||
var o struct {
|
var o struct {
|
||||||
Series []models.Row `json:"series,omitempty"`
|
Series []models.Row `json:"series,omitempty"`
|
||||||
Err string `json:"error,omitempty"`
|
Messages []*Message `json:"messages,omitempty"`
|
||||||
|
Err string `json:"error,omitempty"`
|
||||||
}
|
}
|
||||||
|
|
||||||
dec := json.NewDecoder(bytes.NewBuffer(b))
|
dec := json.NewDecoder(bytes.NewBuffer(b))
|
||||||
|
@ -424,6 +434,7 @@ func (r *Result) UnmarshalJSON(b []byte) error {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
r.Series = o.Series
|
r.Series = o.Series
|
||||||
|
r.Messages = o.Messages
|
||||||
if o.Err != "" {
|
if o.Err != "" {
|
||||||
r.Err = errors.New(o.Err)
|
r.Err = errors.New(o.Err)
|
||||||
}
|
}
|
||||||
|
@ -487,17 +498,36 @@ func (r *Response) Error() error {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// duplexReader reads responses and writes it to another writer while
|
||||||
|
// satisfying the reader interface.
|
||||||
|
type duplexReader struct {
|
||||||
|
r io.Reader
|
||||||
|
w io.Writer
|
||||||
|
}
|
||||||
|
|
||||||
|
func (r *duplexReader) Read(p []byte) (n int, err error) {
|
||||||
|
n, err = r.r.Read(p)
|
||||||
|
if err == nil {
|
||||||
|
r.w.Write(p[:n])
|
||||||
|
}
|
||||||
|
return n, err
|
||||||
|
}
|
||||||
|
|
||||||
// ChunkedResponse represents a response from the server that
|
// ChunkedResponse represents a response from the server that
|
||||||
// uses chunking to stream the output.
|
// uses chunking to stream the output.
|
||||||
type ChunkedResponse struct {
|
type ChunkedResponse struct {
|
||||||
dec *json.Decoder
|
dec *json.Decoder
|
||||||
|
duplex *duplexReader
|
||||||
|
buf bytes.Buffer
|
||||||
}
|
}
|
||||||
|
|
||||||
// NewChunkedResponse reads a stream and produces responses from the stream.
|
// NewChunkedResponse reads a stream and produces responses from the stream.
|
||||||
func NewChunkedResponse(r io.Reader) *ChunkedResponse {
|
func NewChunkedResponse(r io.Reader) *ChunkedResponse {
|
||||||
dec := json.NewDecoder(r)
|
resp := &ChunkedResponse{}
|
||||||
dec.UseNumber()
|
resp.duplex = &duplexReader{r: r, w: &resp.buf}
|
||||||
return &ChunkedResponse{dec: dec}
|
resp.dec = json.NewDecoder(resp.duplex)
|
||||||
|
resp.dec.UseNumber()
|
||||||
|
return resp
|
||||||
}
|
}
|
||||||
|
|
||||||
// NextResponse reads the next line of the stream and returns a response.
|
// NextResponse reads the next line of the stream and returns a response.
|
||||||
|
@ -507,8 +537,13 @@ func (r *ChunkedResponse) NextResponse() (*Response, error) {
|
||||||
if err == io.EOF {
|
if err == io.EOF {
|
||||||
return nil, nil
|
return nil, nil
|
||||||
}
|
}
|
||||||
return nil, err
|
// A decoding error happened. This probably means the server crashed
|
||||||
|
// and sent a last-ditch error message to us. Ensure we have read the
|
||||||
|
// entirety of the connection to get any remaining error text.
|
||||||
|
io.Copy(ioutil.Discard, r.duplex)
|
||||||
|
return nil, errors.New(strings.TrimSpace(r.buf.String()))
|
||||||
}
|
}
|
||||||
|
r.buf.Reset()
|
||||||
return &response, nil
|
return &response, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -551,7 +586,7 @@ func (p *Point) MarshalJSON() ([]byte, error) {
|
||||||
// MarshalString renders string representation of a Point with specified
|
// MarshalString renders string representation of a Point with specified
|
||||||
// precision. The default precision is nanoseconds.
|
// precision. The default precision is nanoseconds.
|
||||||
func (p *Point) MarshalString() string {
|
func (p *Point) MarshalString() string {
|
||||||
pt, err := models.NewPoint(p.Measurement, p.Tags, p.Fields, p.Time)
|
pt, err := models.NewPoint(p.Measurement, models.NewTags(p.Tags), p.Fields, p.Time)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return "# ERROR: " + err.Error() + " " + p.Measurement
|
return "# ERROR: " + err.Error() + " " + p.Measurement
|
||||||
}
|
}
|
||||||
|
|
|
@ -0,0 +1,501 @@
|
||||||
|
package client
|
||||||
|
|
||||||
|
import (
|
||||||
|
"bytes"
|
||||||
|
"crypto/tls"
|
||||||
|
"encoding/json"
|
||||||
|
"errors"
|
||||||
|
"fmt"
|
||||||
|
"io/ioutil"
|
||||||
|
"net/http"
|
||||||
|
"net/url"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"github.com/influxdata/influxdb/models"
|
||||||
|
)
|
||||||
|
|
||||||
|
// HTTPConfig is the config data needed to create an HTTP Client
|
||||||
|
type HTTPConfig struct {
|
||||||
|
// Addr should be of the form "http://host:port"
|
||||||
|
// or "http://[ipv6-host%zone]:port".
|
||||||
|
Addr string
|
||||||
|
|
||||||
|
// Username is the influxdb username, optional
|
||||||
|
Username string
|
||||||
|
|
||||||
|
// Password is the influxdb password, optional
|
||||||
|
Password string
|
||||||
|
|
||||||
|
// UserAgent is the http User Agent, defaults to "InfluxDBClient"
|
||||||
|
UserAgent string
|
||||||
|
|
||||||
|
// Timeout for influxdb writes, defaults to no timeout
|
||||||
|
Timeout time.Duration
|
||||||
|
|
||||||
|
// InsecureSkipVerify gets passed to the http client, if true, it will
|
||||||
|
// skip https certificate verification. Defaults to false
|
||||||
|
InsecureSkipVerify bool
|
||||||
|
|
||||||
|
// TLSConfig allows the user to set their own TLS config for the HTTP
|
||||||
|
// Client. If set, this option overrides InsecureSkipVerify.
|
||||||
|
TLSConfig *tls.Config
|
||||||
|
}
|
||||||
|
|
||||||
|
// BatchPointsConfig is the config data needed to create an instance of the BatchPoints struct
|
||||||
|
type BatchPointsConfig struct {
|
||||||
|
// Precision is the write precision of the points, defaults to "ns"
|
||||||
|
Precision string
|
||||||
|
|
||||||
|
// Database is the database to write points to
|
||||||
|
Database string
|
||||||
|
|
||||||
|
// RetentionPolicy is the retention policy of the points
|
||||||
|
RetentionPolicy string
|
||||||
|
|
||||||
|
// Write consistency is the number of servers required to confirm write
|
||||||
|
WriteConsistency string
|
||||||
|
}
|
||||||
|
|
||||||
|
// Client is a client interface for writing & querying the database
|
||||||
|
type Client interface {
|
||||||
|
// Ping checks that status of cluster, and will always return 0 time and no
|
||||||
|
// error for UDP clients
|
||||||
|
Ping(timeout time.Duration) (time.Duration, string, error)
|
||||||
|
|
||||||
|
// Write takes a BatchPoints object and writes all Points to InfluxDB.
|
||||||
|
Write(bp BatchPoints) error
|
||||||
|
|
||||||
|
// Query makes an InfluxDB Query on the database. This will fail if using
|
||||||
|
// the UDP client.
|
||||||
|
Query(q Query) (*Response, error)
|
||||||
|
|
||||||
|
// Close releases any resources a Client may be using.
|
||||||
|
Close() error
|
||||||
|
}
|
||||||
|
|
||||||
|
// NewHTTPClient returns a new Client from the provided config.
|
||||||
|
// Client is safe for concurrent use by multiple goroutines.
|
||||||
|
func NewHTTPClient(conf HTTPConfig) (Client, error) {
|
||||||
|
if conf.UserAgent == "" {
|
||||||
|
conf.UserAgent = "InfluxDBClient"
|
||||||
|
}
|
||||||
|
|
||||||
|
u, err := url.Parse(conf.Addr)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
} else if u.Scheme != "http" && u.Scheme != "https" {
|
||||||
|
m := fmt.Sprintf("Unsupported protocol scheme: %s, your address"+
|
||||||
|
" must start with http:// or https://", u.Scheme)
|
||||||
|
return nil, errors.New(m)
|
||||||
|
}
|
||||||
|
|
||||||
|
tr := &http.Transport{
|
||||||
|
TLSClientConfig: &tls.Config{
|
||||||
|
InsecureSkipVerify: conf.InsecureSkipVerify,
|
||||||
|
},
|
||||||
|
}
|
||||||
|
if conf.TLSConfig != nil {
|
||||||
|
tr.TLSClientConfig = conf.TLSConfig
|
||||||
|
}
|
||||||
|
return &client{
|
||||||
|
url: *u,
|
||||||
|
username: conf.Username,
|
||||||
|
password: conf.Password,
|
||||||
|
useragent: conf.UserAgent,
|
||||||
|
httpClient: &http.Client{
|
||||||
|
Timeout: conf.Timeout,
|
||||||
|
Transport: tr,
|
||||||
|
},
|
||||||
|
transport: tr,
|
||||||
|
}, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// Ping will check to see if the server is up with an optional timeout on waiting for leader.
|
||||||
|
// Ping returns how long the request took, the version of the server it connected to, and an error if one occurred.
|
||||||
|
func (c *client) Ping(timeout time.Duration) (time.Duration, string, error) {
|
||||||
|
now := time.Now()
|
||||||
|
u := c.url
|
||||||
|
u.Path = "ping"
|
||||||
|
|
||||||
|
req, err := http.NewRequest("GET", u.String(), nil)
|
||||||
|
if err != nil {
|
||||||
|
return 0, "", err
|
||||||
|
}
|
||||||
|
|
||||||
|
req.Header.Set("User-Agent", c.useragent)
|
||||||
|
|
||||||
|
if c.username != "" {
|
||||||
|
req.SetBasicAuth(c.username, c.password)
|
||||||
|
}
|
||||||
|
|
||||||
|
if timeout > 0 {
|
||||||
|
params := req.URL.Query()
|
||||||
|
params.Set("wait_for_leader", fmt.Sprintf("%.0fs", timeout.Seconds()))
|
||||||
|
req.URL.RawQuery = params.Encode()
|
||||||
|
}
|
||||||
|
|
||||||
|
resp, err := c.httpClient.Do(req)
|
||||||
|
if err != nil {
|
||||||
|
return 0, "", err
|
||||||
|
}
|
||||||
|
defer resp.Body.Close()
|
||||||
|
|
||||||
|
body, err := ioutil.ReadAll(resp.Body)
|
||||||
|
if err != nil {
|
||||||
|
return 0, "", err
|
||||||
|
}
|
||||||
|
|
||||||
|
if resp.StatusCode != http.StatusNoContent {
|
||||||
|
var err = fmt.Errorf(string(body))
|
||||||
|
return 0, "", err
|
||||||
|
}
|
||||||
|
|
||||||
|
version := resp.Header.Get("X-Influxdb-Version")
|
||||||
|
return time.Since(now), version, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// Close releases the client's resources.
|
||||||
|
func (c *client) Close() error {
|
||||||
|
c.transport.CloseIdleConnections()
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// client is safe for concurrent use as the fields are all read-only
|
||||||
|
// once the client is instantiated.
|
||||||
|
type client struct {
|
||||||
|
// N.B - if url.UserInfo is accessed in future modifications to the
|
||||||
|
// methods on client, you will need to syncronise access to url.
|
||||||
|
url url.URL
|
||||||
|
username string
|
||||||
|
password string
|
||||||
|
useragent string
|
||||||
|
httpClient *http.Client
|
||||||
|
transport *http.Transport
|
||||||
|
}
|
||||||
|
|
||||||
|
// BatchPoints is an interface into a batched grouping of points to write into
|
||||||
|
// InfluxDB together. BatchPoints is NOT thread-safe, you must create a separate
|
||||||
|
// batch for each goroutine.
|
||||||
|
type BatchPoints interface {
|
||||||
|
// AddPoint adds the given point to the Batch of points
|
||||||
|
AddPoint(p *Point)
|
||||||
|
// AddPoints adds the given points to the Batch of points
|
||||||
|
AddPoints(ps []*Point)
|
||||||
|
// Points lists the points in the Batch
|
||||||
|
Points() []*Point
|
||||||
|
|
||||||
|
// Precision returns the currently set precision of this Batch
|
||||||
|
Precision() string
|
||||||
|
// SetPrecision sets the precision of this batch.
|
||||||
|
SetPrecision(s string) error
|
||||||
|
|
||||||
|
// Database returns the currently set database of this Batch
|
||||||
|
Database() string
|
||||||
|
// SetDatabase sets the database of this Batch
|
||||||
|
SetDatabase(s string)
|
||||||
|
|
||||||
|
// WriteConsistency returns the currently set write consistency of this Batch
|
||||||
|
WriteConsistency() string
|
||||||
|
// SetWriteConsistency sets the write consistency of this Batch
|
||||||
|
SetWriteConsistency(s string)
|
||||||
|
|
||||||
|
// RetentionPolicy returns the currently set retention policy of this Batch
|
||||||
|
RetentionPolicy() string
|
||||||
|
// SetRetentionPolicy sets the retention policy of this Batch
|
||||||
|
SetRetentionPolicy(s string)
|
||||||
|
}
|
||||||
|
|
||||||
|
// NewBatchPoints returns a BatchPoints interface based on the given config.
|
||||||
|
func NewBatchPoints(conf BatchPointsConfig) (BatchPoints, error) {
|
||||||
|
if conf.Precision == "" {
|
||||||
|
conf.Precision = "ns"
|
||||||
|
}
|
||||||
|
if _, err := time.ParseDuration("1" + conf.Precision); err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
bp := &batchpoints{
|
||||||
|
database: conf.Database,
|
||||||
|
precision: conf.Precision,
|
||||||
|
retentionPolicy: conf.RetentionPolicy,
|
||||||
|
writeConsistency: conf.WriteConsistency,
|
||||||
|
}
|
||||||
|
return bp, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
type batchpoints struct {
|
||||||
|
points []*Point
|
||||||
|
database string
|
||||||
|
precision string
|
||||||
|
retentionPolicy string
|
||||||
|
writeConsistency string
|
||||||
|
}
|
||||||
|
|
||||||
|
func (bp *batchpoints) AddPoint(p *Point) {
|
||||||
|
bp.points = append(bp.points, p)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (bp *batchpoints) AddPoints(ps []*Point) {
|
||||||
|
bp.points = append(bp.points, ps...)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (bp *batchpoints) Points() []*Point {
|
||||||
|
return bp.points
|
||||||
|
}
|
||||||
|
|
||||||
|
func (bp *batchpoints) Precision() string {
|
||||||
|
return bp.precision
|
||||||
|
}
|
||||||
|
|
||||||
|
func (bp *batchpoints) Database() string {
|
||||||
|
return bp.database
|
||||||
|
}
|
||||||
|
|
||||||
|
func (bp *batchpoints) WriteConsistency() string {
|
||||||
|
return bp.writeConsistency
|
||||||
|
}
|
||||||
|
|
||||||
|
func (bp *batchpoints) RetentionPolicy() string {
|
||||||
|
return bp.retentionPolicy
|
||||||
|
}
|
||||||
|
|
||||||
|
func (bp *batchpoints) SetPrecision(p string) error {
|
||||||
|
if _, err := time.ParseDuration("1" + p); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
bp.precision = p
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (bp *batchpoints) SetDatabase(db string) {
|
||||||
|
bp.database = db
|
||||||
|
}
|
||||||
|
|
||||||
|
func (bp *batchpoints) SetWriteConsistency(wc string) {
|
||||||
|
bp.writeConsistency = wc
|
||||||
|
}
|
||||||
|
|
||||||
|
func (bp *batchpoints) SetRetentionPolicy(rp string) {
|
||||||
|
bp.retentionPolicy = rp
|
||||||
|
}
|
||||||
|
|
||||||
|
// Point represents a single data point
|
||||||
|
type Point struct {
|
||||||
|
pt models.Point
|
||||||
|
}
|
||||||
|
|
||||||
|
// NewPoint returns a point with the given timestamp. If a timestamp is not
|
||||||
|
// given, then data is sent to the database without a timestamp, in which case
|
||||||
|
// the server will assign local time upon reception. NOTE: it is recommended to
|
||||||
|
// send data with a timestamp.
|
||||||
|
func NewPoint(
|
||||||
|
name string,
|
||||||
|
tags map[string]string,
|
||||||
|
fields map[string]interface{},
|
||||||
|
t ...time.Time,
|
||||||
|
) (*Point, error) {
|
||||||
|
var T time.Time
|
||||||
|
if len(t) > 0 {
|
||||||
|
T = t[0]
|
||||||
|
}
|
||||||
|
|
||||||
|
pt, err := models.NewPoint(name, models.NewTags(tags), fields, T)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
return &Point{
|
||||||
|
pt: pt,
|
||||||
|
}, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// String returns a line-protocol string of the Point
|
||||||
|
func (p *Point) String() string {
|
||||||
|
return p.pt.String()
|
||||||
|
}
|
||||||
|
|
||||||
|
// PrecisionString returns a line-protocol string of the Point, at precision
|
||||||
|
func (p *Point) PrecisionString(precison string) string {
|
||||||
|
return p.pt.PrecisionString(precison)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Name returns the measurement name of the point
|
||||||
|
func (p *Point) Name() string {
|
||||||
|
return p.pt.Name()
|
||||||
|
}
|
||||||
|
|
||||||
|
// Tags returns the tags associated with the point
|
||||||
|
func (p *Point) Tags() map[string]string {
|
||||||
|
return p.pt.Tags().Map()
|
||||||
|
}
|
||||||
|
|
||||||
|
// Time return the timestamp for the point
|
||||||
|
func (p *Point) Time() time.Time {
|
||||||
|
return p.pt.Time()
|
||||||
|
}
|
||||||
|
|
||||||
|
// UnixNano returns the unix nano time of the point
|
||||||
|
func (p *Point) UnixNano() int64 {
|
||||||
|
return p.pt.UnixNano()
|
||||||
|
}
|
||||||
|
|
||||||
|
// Fields returns the fields for the point
|
||||||
|
func (p *Point) Fields() map[string]interface{} {
|
||||||
|
return p.pt.Fields()
|
||||||
|
}
|
||||||
|
|
||||||
|
// NewPointFrom returns a point from the provided models.Point.
|
||||||
|
func NewPointFrom(pt models.Point) *Point {
|
||||||
|
return &Point{pt: pt}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *client) Write(bp BatchPoints) error {
|
||||||
|
var b bytes.Buffer
|
||||||
|
|
||||||
|
for _, p := range bp.Points() {
|
||||||
|
if _, err := b.WriteString(p.pt.PrecisionString(bp.Precision())); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
if err := b.WriteByte('\n'); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
u := c.url
|
||||||
|
u.Path = "write"
|
||||||
|
req, err := http.NewRequest("POST", u.String(), &b)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
req.Header.Set("Content-Type", "")
|
||||||
|
req.Header.Set("User-Agent", c.useragent)
|
||||||
|
if c.username != "" {
|
||||||
|
req.SetBasicAuth(c.username, c.password)
|
||||||
|
}
|
||||||
|
|
||||||
|
params := req.URL.Query()
|
||||||
|
params.Set("db", bp.Database())
|
||||||
|
params.Set("rp", bp.RetentionPolicy())
|
||||||
|
params.Set("precision", bp.Precision())
|
||||||
|
params.Set("consistency", bp.WriteConsistency())
|
||||||
|
req.URL.RawQuery = params.Encode()
|
||||||
|
|
||||||
|
resp, err := c.httpClient.Do(req)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
defer resp.Body.Close()
|
||||||
|
|
||||||
|
body, err := ioutil.ReadAll(resp.Body)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
if resp.StatusCode != http.StatusNoContent && resp.StatusCode != http.StatusOK {
|
||||||
|
var err = fmt.Errorf(string(body))
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// Query defines a query to send to the server
|
||||||
|
type Query struct {
|
||||||
|
Command string
|
||||||
|
Database string
|
||||||
|
Precision string
|
||||||
|
}
|
||||||
|
|
||||||
|
// NewQuery returns a query object
|
||||||
|
// database and precision strings can be empty strings if they are not needed
|
||||||
|
// for the query.
|
||||||
|
func NewQuery(command, database, precision string) Query {
|
||||||
|
return Query{
|
||||||
|
Command: command,
|
||||||
|
Database: database,
|
||||||
|
Precision: precision,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Response represents a list of statement results.
|
||||||
|
type Response struct {
|
||||||
|
Results []Result
|
||||||
|
Err string `json:"error,omitempty"`
|
||||||
|
}
|
||||||
|
|
||||||
|
// Error returns the first error from any statement.
|
||||||
|
// Returns nil if no errors occurred on any statements.
|
||||||
|
func (r *Response) Error() error {
|
||||||
|
if r.Err != "" {
|
||||||
|
return fmt.Errorf(r.Err)
|
||||||
|
}
|
||||||
|
for _, result := range r.Results {
|
||||||
|
if result.Err != "" {
|
||||||
|
return fmt.Errorf(result.Err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// Message represents a user message.
|
||||||
|
type Message struct {
|
||||||
|
Level string
|
||||||
|
Text string
|
||||||
|
}
|
||||||
|
|
||||||
|
// Result represents a resultset returned from a single statement.
|
||||||
|
type Result struct {
|
||||||
|
Series []models.Row
|
||||||
|
Messages []*Message
|
||||||
|
Err string `json:"error,omitempty"`
|
||||||
|
}
|
||||||
|
|
||||||
|
// Query sends a command to the server and returns the Response
|
||||||
|
func (c *client) Query(q Query) (*Response, error) {
|
||||||
|
u := c.url
|
||||||
|
u.Path = "query"
|
||||||
|
|
||||||
|
req, err := http.NewRequest("POST", u.String(), nil)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
req.Header.Set("Content-Type", "")
|
||||||
|
req.Header.Set("User-Agent", c.useragent)
|
||||||
|
if c.username != "" {
|
||||||
|
req.SetBasicAuth(c.username, c.password)
|
||||||
|
}
|
||||||
|
|
||||||
|
params := req.URL.Query()
|
||||||
|
params.Set("q", q.Command)
|
||||||
|
params.Set("db", q.Database)
|
||||||
|
if q.Precision != "" {
|
||||||
|
params.Set("epoch", q.Precision)
|
||||||
|
}
|
||||||
|
req.URL.RawQuery = params.Encode()
|
||||||
|
|
||||||
|
resp, err := c.httpClient.Do(req)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
defer resp.Body.Close()
|
||||||
|
|
||||||
|
var response Response
|
||||||
|
dec := json.NewDecoder(resp.Body)
|
||||||
|
dec.UseNumber()
|
||||||
|
decErr := dec.Decode(&response)
|
||||||
|
|
||||||
|
// ignore this error if we got an invalid status code
|
||||||
|
if decErr != nil && decErr.Error() == "EOF" && resp.StatusCode != http.StatusOK {
|
||||||
|
decErr = nil
|
||||||
|
}
|
||||||
|
// If we got a valid decode error, send that back
|
||||||
|
if decErr != nil {
|
||||||
|
return nil, fmt.Errorf("unable to decode json: received status code %d err: %s", resp.StatusCode, decErr)
|
||||||
|
}
|
||||||
|
// If we don't have an error in our json response, and didn't get statusOK
|
||||||
|
// then send back an error
|
||||||
|
if resp.StatusCode != http.StatusOK && response.Error() == nil {
|
||||||
|
return &response, fmt.Errorf("received status code %d from server",
|
||||||
|
resp.StatusCode)
|
||||||
|
}
|
||||||
|
return &response, nil
|
||||||
|
}
|
|
@ -0,0 +1,112 @@
|
||||||
|
package client
|
||||||
|
|
||||||
|
import (
|
||||||
|
"fmt"
|
||||||
|
"io"
|
||||||
|
"net"
|
||||||
|
"time"
|
||||||
|
)
|
||||||
|
|
||||||
|
const (
|
||||||
|
// UDPPayloadSize is a reasonable default payload size for UDP packets that
|
||||||
|
// could be travelling over the internet.
|
||||||
|
UDPPayloadSize = 512
|
||||||
|
)
|
||||||
|
|
||||||
|
// UDPConfig is the config data needed to create a UDP Client
|
||||||
|
type UDPConfig struct {
|
||||||
|
// Addr should be of the form "host:port"
|
||||||
|
// or "[ipv6-host%zone]:port".
|
||||||
|
Addr string
|
||||||
|
|
||||||
|
// PayloadSize is the maximum size of a UDP client message, optional
|
||||||
|
// Tune this based on your network. Defaults to UDPPayloadSize.
|
||||||
|
PayloadSize int
|
||||||
|
}
|
||||||
|
|
||||||
|
// NewUDPClient returns a client interface for writing to an InfluxDB UDP
|
||||||
|
// service from the given config.
|
||||||
|
func NewUDPClient(conf UDPConfig) (Client, error) {
|
||||||
|
var udpAddr *net.UDPAddr
|
||||||
|
udpAddr, err := net.ResolveUDPAddr("udp", conf.Addr)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
conn, err := net.DialUDP("udp", nil, udpAddr)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
payloadSize := conf.PayloadSize
|
||||||
|
if payloadSize == 0 {
|
||||||
|
payloadSize = UDPPayloadSize
|
||||||
|
}
|
||||||
|
|
||||||
|
return &udpclient{
|
||||||
|
conn: conn,
|
||||||
|
payloadSize: payloadSize,
|
||||||
|
}, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// Close releases the udpclient's resources.
|
||||||
|
func (uc *udpclient) Close() error {
|
||||||
|
return uc.conn.Close()
|
||||||
|
}
|
||||||
|
|
||||||
|
type udpclient struct {
|
||||||
|
conn io.WriteCloser
|
||||||
|
payloadSize int
|
||||||
|
}
|
||||||
|
|
||||||
|
func (uc *udpclient) Write(bp BatchPoints) error {
|
||||||
|
var b = make([]byte, 0, uc.payloadSize) // initial buffer size, it will grow as needed
|
||||||
|
var d, _ = time.ParseDuration("1" + bp.Precision())
|
||||||
|
|
||||||
|
var delayedError error
|
||||||
|
|
||||||
|
var checkBuffer = func(n int) {
|
||||||
|
if len(b) > 0 && len(b)+n > uc.payloadSize {
|
||||||
|
if _, err := uc.conn.Write(b); err != nil {
|
||||||
|
delayedError = err
|
||||||
|
}
|
||||||
|
b = b[:0]
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, p := range bp.Points() {
|
||||||
|
p.pt.Round(d)
|
||||||
|
pointSize := p.pt.StringSize() + 1 // include newline in size
|
||||||
|
//point := p.pt.RoundedString(d) + "\n"
|
||||||
|
|
||||||
|
checkBuffer(pointSize)
|
||||||
|
|
||||||
|
if p.Time().IsZero() || pointSize <= uc.payloadSize {
|
||||||
|
b = p.pt.AppendString(b)
|
||||||
|
b = append(b, '\n')
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
points := p.pt.Split(uc.payloadSize - 1) // account for newline character
|
||||||
|
for _, sp := range points {
|
||||||
|
checkBuffer(sp.StringSize() + 1)
|
||||||
|
b = sp.AppendString(b)
|
||||||
|
b = append(b, '\n')
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if len(b) > 0 {
|
||||||
|
if _, err := uc.conn.Write(b); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return delayedError
|
||||||
|
}
|
||||||
|
|
||||||
|
func (uc *udpclient) Query(q Query) (*Response, error) {
|
||||||
|
return nil, fmt.Errorf("Querying via UDP is not supported")
|
||||||
|
}
|
||||||
|
|
||||||
|
func (uc *udpclient) Ping(timeout time.Duration) (time.Duration, string, error) {
|
||||||
|
return 0, "", nil
|
||||||
|
}
|
|
@ -0,0 +1,27 @@
|
||||||
|
package models
|
||||||
|
|
||||||
|
// from stdlib hash/fnv/fnv.go
|
||||||
|
const (
|
||||||
|
prime64 = 1099511628211
|
||||||
|
offset64 = 14695981039346656037
|
||||||
|
)
|
||||||
|
|
||||||
|
// InlineFNV64a is an alloc-free port of the standard library's fnv64a.
|
||||||
|
type InlineFNV64a uint64
|
||||||
|
|
||||||
|
func NewInlineFNV64a() InlineFNV64a {
|
||||||
|
return offset64
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *InlineFNV64a) Write(data []byte) (int, error) {
|
||||||
|
hash := uint64(*s)
|
||||||
|
for _, c := range data {
|
||||||
|
hash ^= uint64(c)
|
||||||
|
hash *= prime64
|
||||||
|
}
|
||||||
|
*s = InlineFNV64a(hash)
|
||||||
|
return len(data), nil
|
||||||
|
}
|
||||||
|
func (s *InlineFNV64a) Sum64() uint64 {
|
||||||
|
return uint64(*s)
|
||||||
|
}
|
38
vendor/github.com/influxdata/influxdb/models/inline_strconv_parse.go
generated
vendored
Normal file
38
vendor/github.com/influxdata/influxdb/models/inline_strconv_parse.go
generated
vendored
Normal file
|
@ -0,0 +1,38 @@
|
||||||
|
package models
|
||||||
|
|
||||||
|
import (
|
||||||
|
"reflect"
|
||||||
|
"strconv"
|
||||||
|
"unsafe"
|
||||||
|
)
|
||||||
|
|
||||||
|
// parseIntBytes is a zero-alloc wrapper around strconv.ParseInt.
|
||||||
|
func parseIntBytes(b []byte, base int, bitSize int) (i int64, err error) {
|
||||||
|
s := unsafeBytesToString(b)
|
||||||
|
return strconv.ParseInt(s, base, bitSize)
|
||||||
|
}
|
||||||
|
|
||||||
|
// parseFloatBytes is a zero-alloc wrapper around strconv.ParseFloat.
|
||||||
|
func parseFloatBytes(b []byte, bitSize int) (float64, error) {
|
||||||
|
s := unsafeBytesToString(b)
|
||||||
|
return strconv.ParseFloat(s, bitSize)
|
||||||
|
}
|
||||||
|
|
||||||
|
// parseBoolBytes is a zero-alloc wrapper around strconv.ParseBool.
|
||||||
|
func parseBoolBytes(b []byte) (bool, error) {
|
||||||
|
return strconv.ParseBool(unsafeBytesToString(b))
|
||||||
|
}
|
||||||
|
|
||||||
|
// unsafeBytesToString converts a []byte to a string without a heap allocation.
|
||||||
|
//
|
||||||
|
// It is unsafe, and is intended to prepare input to short-lived functions
|
||||||
|
// that require strings.
|
||||||
|
func unsafeBytesToString(in []byte) string {
|
||||||
|
src := *(*reflect.SliceHeader)(unsafe.Pointer(&in))
|
||||||
|
dst := reflect.StringHeader{
|
||||||
|
Data: src.Data,
|
||||||
|
Len: src.Len,
|
||||||
|
}
|
||||||
|
s := *(*string)(unsafe.Pointer(&dst))
|
||||||
|
return s
|
||||||
|
}
|
File diff suppressed because it is too large
Load Diff
|
@ -1,7 +1,6 @@
|
||||||
package models
|
package models
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"hash/fnv"
|
|
||||||
"sort"
|
"sort"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@ -11,7 +10,6 @@ type Row struct {
|
||||||
Tags map[string]string `json:"tags,omitempty"`
|
Tags map[string]string `json:"tags,omitempty"`
|
||||||
Columns []string `json:"columns,omitempty"`
|
Columns []string `json:"columns,omitempty"`
|
||||||
Values [][]interface{} `json:"values,omitempty"`
|
Values [][]interface{} `json:"values,omitempty"`
|
||||||
Err error `json:"err,omitempty"`
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// SameSeries returns true if r contains values for the same series as o.
|
// SameSeries returns true if r contains values for the same series as o.
|
||||||
|
@ -21,7 +19,7 @@ func (r *Row) SameSeries(o *Row) bool {
|
||||||
|
|
||||||
// tagsHash returns a hash of tag key/value pairs.
|
// tagsHash returns a hash of tag key/value pairs.
|
||||||
func (r *Row) tagsHash() uint64 {
|
func (r *Row) tagsHash() uint64 {
|
||||||
h := fnv.New64a()
|
h := NewInlineFNV64a()
|
||||||
keys := r.tagsKeys()
|
keys := r.tagsKeys()
|
||||||
for _, k := range keys {
|
for _, k := range keys {
|
||||||
h.Write([]byte(k))
|
h.Write([]byte(k))
|
||||||
|
|
|
@ -0,0 +1,40 @@
|
||||||
|
package models
|
||||||
|
|
||||||
|
type Statistic struct {
|
||||||
|
Name string `json:"name"`
|
||||||
|
Tags map[string]string `json:"tags"`
|
||||||
|
Values map[string]interface{} `json:"values"`
|
||||||
|
}
|
||||||
|
|
||||||
|
func NewStatistic(name string) Statistic {
|
||||||
|
return Statistic{
|
||||||
|
Name: name,
|
||||||
|
Tags: make(map[string]string),
|
||||||
|
Values: make(map[string]interface{}),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// StatisticTags is a map that can be merged with others without causing
|
||||||
|
// mutations to either map.
|
||||||
|
type StatisticTags map[string]string
|
||||||
|
|
||||||
|
// Merge creates a new map containing the merged contents of tags and t.
|
||||||
|
// If both tags and the receiver map contain the same key, the value in tags
|
||||||
|
// is used in the resulting map.
|
||||||
|
//
|
||||||
|
// Merge always returns a usable map.
|
||||||
|
func (t StatisticTags) Merge(tags map[string]string) map[string]string {
|
||||||
|
// Add everything in tags to the result.
|
||||||
|
out := make(map[string]string, len(tags))
|
||||||
|
for k, v := range tags {
|
||||||
|
out[k] = v
|
||||||
|
}
|
||||||
|
|
||||||
|
// Only add values from t that don't appear in tags.
|
||||||
|
for k, v := range t {
|
||||||
|
if _, ok := tags[k]; !ok {
|
||||||
|
out[k] = v
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return out
|
||||||
|
}
|
|
@ -9,14 +9,36 @@ import (
|
||||||
"time"
|
"time"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
const (
|
||||||
|
// MinNanoTime is the minumum time that can be represented.
|
||||||
|
//
|
||||||
|
// 1677-09-21 00:12:43.145224194 +0000 UTC
|
||||||
|
//
|
||||||
|
// The two lowest minimum integers are used as sentinel values. The
|
||||||
|
// minimum value needs to be used as a value lower than any other value for
|
||||||
|
// comparisons and another separate value is needed to act as a sentinel
|
||||||
|
// default value that is unusable by the user, but usable internally.
|
||||||
|
// Because these two values need to be used for a special purpose, we do
|
||||||
|
// not allow users to write points at these two times.
|
||||||
|
MinNanoTime = int64(math.MinInt64) + 2
|
||||||
|
|
||||||
|
// MaxNanoTime is the maximum time that can be represented.
|
||||||
|
//
|
||||||
|
// 2262-04-11 23:47:16.854775806 +0000 UTC
|
||||||
|
//
|
||||||
|
// The highest time represented by a nanosecond needs to be used for an
|
||||||
|
// exclusive range in the shard group, so the maximum time needs to be one
|
||||||
|
// less than the possible maximum number of nanoseconds representable by an
|
||||||
|
// int64 so that we don't lose a point at that one time.
|
||||||
|
MaxNanoTime = int64(math.MaxInt64) - 1
|
||||||
|
)
|
||||||
|
|
||||||
var (
|
var (
|
||||||
// MaxNanoTime is the maximum time that can be represented via int64 nanoseconds since the epoch.
|
minNanoTime = time.Unix(0, MinNanoTime).UTC()
|
||||||
MaxNanoTime = time.Unix(0, math.MaxInt64).UTC()
|
maxNanoTime = time.Unix(0, MaxNanoTime).UTC()
|
||||||
// MinNanoTime is the minumum time that can be represented via int64 nanoseconds since the epoch.
|
|
||||||
MinNanoTime = time.Unix(0, math.MinInt64).UTC()
|
|
||||||
|
|
||||||
// ErrTimeOutOfRange gets returned when time is out of the representable range using int64 nanoseconds since the epoch.
|
// ErrTimeOutOfRange gets returned when time is out of the representable range using int64 nanoseconds since the epoch.
|
||||||
ErrTimeOutOfRange = fmt.Errorf("time outside range %s - %s", MinNanoTime, MaxNanoTime)
|
ErrTimeOutOfRange = fmt.Errorf("time outside range %d - %d", MinNanoTime, MaxNanoTime)
|
||||||
)
|
)
|
||||||
|
|
||||||
// SafeCalcTime safely calculates the time given. Will return error if the time is outside the
|
// SafeCalcTime safely calculates the time given. Will return error if the time is outside the
|
||||||
|
@ -24,7 +46,8 @@ var (
|
||||||
func SafeCalcTime(timestamp int64, precision string) (time.Time, error) {
|
func SafeCalcTime(timestamp int64, precision string) (time.Time, error) {
|
||||||
mult := GetPrecisionMultiplier(precision)
|
mult := GetPrecisionMultiplier(precision)
|
||||||
if t, ok := safeSignedMult(timestamp, mult); ok {
|
if t, ok := safeSignedMult(timestamp, mult); ok {
|
||||||
return time.Unix(0, t).UTC(), nil
|
tme := time.Unix(0, t).UTC()
|
||||||
|
return tme, CheckTime(tme)
|
||||||
}
|
}
|
||||||
|
|
||||||
return time.Time{}, ErrTimeOutOfRange
|
return time.Time{}, ErrTimeOutOfRange
|
||||||
|
@ -32,7 +55,7 @@ func SafeCalcTime(timestamp int64, precision string) (time.Time, error) {
|
||||||
|
|
||||||
// CheckTime checks that a time is within the safe range.
|
// CheckTime checks that a time is within the safe range.
|
||||||
func CheckTime(t time.Time) error {
|
func CheckTime(t time.Time) error {
|
||||||
if t.Before(MinNanoTime) || t.After(MaxNanoTime) {
|
if t.Before(minNanoTime) || t.After(maxNanoTime) {
|
||||||
return ErrTimeOutOfRange
|
return ErrTimeOutOfRange
|
||||||
}
|
}
|
||||||
return nil
|
return nil
|
||||||
|
@ -43,7 +66,7 @@ func safeSignedMult(a, b int64) (int64, bool) {
|
||||||
if a == 0 || b == 0 || a == 1 || b == 1 {
|
if a == 0 || b == 0 || a == 1 || b == 1 {
|
||||||
return a * b, true
|
return a * b, true
|
||||||
}
|
}
|
||||||
if a == math.MinInt64 || b == math.MaxInt64 {
|
if a == MinNanoTime || b == MaxNanoTime {
|
||||||
return 0, false
|
return 0, false
|
||||||
}
|
}
|
||||||
c := a * b
|
c := a * b
|
||||||
|
|
|
@ -1,6 +1,9 @@
|
||||||
package escape
|
package escape
|
||||||
|
|
||||||
import "bytes"
|
import (
|
||||||
|
"bytes"
|
||||||
|
"strings"
|
||||||
|
)
|
||||||
|
|
||||||
func Bytes(in []byte) []byte {
|
func Bytes(in []byte) []byte {
|
||||||
for b, esc := range Codes {
|
for b, esc := range Codes {
|
||||||
|
@ -9,7 +12,54 @@ func Bytes(in []byte) []byte {
|
||||||
return in
|
return in
|
||||||
}
|
}
|
||||||
|
|
||||||
|
const escapeChars = `," =`
|
||||||
|
|
||||||
|
func IsEscaped(b []byte) bool {
|
||||||
|
for len(b) > 0 {
|
||||||
|
i := bytes.IndexByte(b, '\\')
|
||||||
|
if i < 0 {
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
|
||||||
|
if i+1 < len(b) && strings.IndexByte(escapeChars, b[i+1]) >= 0 {
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
b = b[i+1:]
|
||||||
|
}
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
|
||||||
|
func AppendUnescaped(dst, src []byte) []byte {
|
||||||
|
var pos int
|
||||||
|
for len(src) > 0 {
|
||||||
|
next := bytes.IndexByte(src[pos:], '\\')
|
||||||
|
if next < 0 || pos+next+1 >= len(src) {
|
||||||
|
return append(dst, src...)
|
||||||
|
}
|
||||||
|
|
||||||
|
if pos+next+1 < len(src) && strings.IndexByte(escapeChars, src[pos+next+1]) >= 0 {
|
||||||
|
if pos+next > 0 {
|
||||||
|
dst = append(dst, src[:pos+next]...)
|
||||||
|
}
|
||||||
|
src = src[pos+next+1:]
|
||||||
|
pos = 0
|
||||||
|
} else {
|
||||||
|
pos += next + 1
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return dst
|
||||||
|
}
|
||||||
|
|
||||||
func Unescape(in []byte) []byte {
|
func Unescape(in []byte) []byte {
|
||||||
|
if len(in) == 0 {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
if bytes.IndexByte(in, '\\') == -1 {
|
||||||
|
return in
|
||||||
|
}
|
||||||
|
|
||||||
i := 0
|
i := 0
|
||||||
inLen := len(in)
|
inLen := len(in)
|
||||||
var out []byte
|
var out []byte
|
||||||
|
|
|
@ -20,6 +20,10 @@ func init() {
|
||||||
}
|
}
|
||||||
|
|
||||||
func UnescapeString(in string) string {
|
func UnescapeString(in string) string {
|
||||||
|
if strings.IndexByte(in, '\\') == -1 {
|
||||||
|
return in
|
||||||
|
}
|
||||||
|
|
||||||
for b, esc := range codesStr {
|
for b, esc := range codesStr {
|
||||||
in = strings.Replace(in, esc, b, -1)
|
in = strings.Replace(in, esc, b, -1)
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue