diff --git a/Godeps/Godeps.json b/Godeps/Godeps.json index 7472b1a894..926d4ff9b2 100644 --- a/Godeps/Godeps.json +++ b/Godeps/Godeps.json @@ -1622,18 +1622,23 @@ }, { "ImportPath": "github.com/influxdata/influxdb/client", - "Comment": "v0.12.2", - "Rev": "383332daed5595926c235f250b11433f67229c35" + "Comment": "v1.1.1", + "Rev": "e47cf1f2e83a02443d7115c54f838be8ee959644" + }, + { + "ImportPath": "github.com/influxdata/influxdb/client/v2", + "Comment": "v1.1.1", + "Rev": "e47cf1f2e83a02443d7115c54f838be8ee959644" }, { "ImportPath": "github.com/influxdata/influxdb/models", - "Comment": "v0.12.2", - "Rev": "383332daed5595926c235f250b11433f67229c35" + "Comment": "v1.1.1", + "Rev": "e47cf1f2e83a02443d7115c54f838be8ee959644" }, { "ImportPath": "github.com/influxdata/influxdb/pkg/escape", - "Comment": "v0.12.2", - "Rev": "383332daed5595926c235f250b11433f67229c35" + "Comment": "v1.1.1", + "Rev": "e47cf1f2e83a02443d7115c54f838be8ee959644" }, { "ImportPath": "github.com/jmespath/go-jmespath", diff --git a/Godeps/LICENSES b/Godeps/LICENSES index 947bd7e5f0..84937327c5 100644 --- a/Godeps/LICENSES +++ b/Godeps/LICENSES @@ -57978,6 +57978,34 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. ================================================================================ +================================================================================ += vendor/github.com/influxdata/influxdb/client/v2 licensed under: = + +The MIT License (MIT) + +Copyright (c) 2013-2016 Errplane Inc. + +Permission is hereby granted, free of charge, to any person obtaining a copy of +this software and associated documentation files (the "Software"), to deal in +the Software without restriction, including without limitation the rights to +use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of +the Software, and to permit persons to whom the Software is furnished to do so, +subject to the following conditions: + +The above copyright notice and this permission notice shall be included in all +copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS +FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR +COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER +IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN +CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. + += vendor/github.com/influxdata/influxdb/LICENSE ba8146ad9cc2a128209983265136e06a - +================================================================================ + + ================================================================================ = vendor/github.com/influxdata/influxdb/models licensed under: = diff --git a/test/e2e/BUILD b/test/e2e/BUILD index 875e7f6e8d..aefd34797a 100644 --- a/test/e2e/BUILD +++ b/test/e2e/BUILD @@ -166,7 +166,7 @@ go_library( "//vendor:github.com/ghodss/yaml", "//vendor:github.com/golang/glog", "//vendor:github.com/google/cadvisor/info/v1", - "//vendor:github.com/influxdata/influxdb/client", + "//vendor:github.com/influxdata/influxdb/client/v2", "//vendor:github.com/onsi/ginkgo", "//vendor:github.com/onsi/ginkgo/config", "//vendor:github.com/onsi/ginkgo/reporters", diff --git a/test/e2e/monitoring.go b/test/e2e/monitoring.go index 35f2816b22..e08974fa96 100644 --- a/test/e2e/monitoring.go +++ b/test/e2e/monitoring.go @@ -23,7 +23,7 @@ import ( "fmt" "time" - influxdb "github.com/influxdata/influxdb/client" + influxdb "github.com/influxdata/influxdb/client/v2" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/util/wait" diff --git a/vendor/BUILD b/vendor/BUILD index 811973ee8f..4dca07c936 100644 --- a/vendor/BUILD +++ b/vendor/BUILD @@ -4790,12 +4790,25 @@ go_library( deps = ["//vendor:github.com/influxdata/influxdb/models"], ) +go_library( + name = "github.com/influxdata/influxdb/client/v2", + srcs = [ + "github.com/influxdata/influxdb/client/v2/client.go", + "github.com/influxdata/influxdb/client/v2/udp.go", + ], + tags = ["automanaged"], + deps = ["//vendor:github.com/influxdata/influxdb/models"], +) + go_library( name = "github.com/influxdata/influxdb/models", srcs = [ "github.com/influxdata/influxdb/models/consistency.go", + "github.com/influxdata/influxdb/models/inline_fnv.go", + "github.com/influxdata/influxdb/models/inline_strconv_parse.go", "github.com/influxdata/influxdb/models/points.go", "github.com/influxdata/influxdb/models/rows.go", + "github.com/influxdata/influxdb/models/statistic.go", "github.com/influxdata/influxdb/models/time.go", ], tags = ["automanaged"], diff --git a/vendor/github.com/influxdata/influxdb/LICENSE_OF_DEPENDENCIES.md b/vendor/github.com/influxdata/influxdb/LICENSE_OF_DEPENDENCIES.md index f0794abc11..14bbfa9535 100644 --- a/vendor/github.com/influxdata/influxdb/LICENSE_OF_DEPENDENCIES.md +++ b/vendor/github.com/influxdata/influxdb/LICENSE_OF_DEPENDENCIES.md @@ -1,27 +1,23 @@ # List - bootstrap 3.3.5 [MIT LICENSE](https://github.com/twbs/bootstrap/blob/master/LICENSE) - collectd.org [ISC LICENSE](https://github.com/collectd/go-collectd/blob/master/LICENSE) -- github.com/armon/go-metrics [MIT LICENSE](https://github.com/armon/go-metrics/blob/master/LICENSE) - github.com/BurntSushi/toml [WTFPL LICENSE](https://github.com/BurntSushi/toml/blob/master/COPYING) - github.com/bmizerany/pat [MIT LICENSE](https://github.com/bmizerany/pat#license) - github.com/boltdb/bolt [MIT LICENSE](https://github.com/boltdb/bolt/blob/master/LICENSE) +- github.com/davecgh/go-spew/spew [ISC LICENSE](https://github.com/davecgh/go-spew/blob/master/LICENSE) +- github.com/dgrijalva/jwt-go [MIT LICENSE](https://github.com/dgrijalva/jwt-go/blob/master/LICENSE) - github.com/dgryski/go-bits [MIT LICENSE](https://github.com/dgryski/go-bits/blob/master/LICENSE) - github.com/dgryski/go-bitstream [MIT LICENSE](https://github.com/dgryski/go-bitstream/blob/master/LICENSE) - github.com/gogo/protobuf/proto [BSD LICENSE](https://github.com/gogo/protobuf/blob/master/LICENSE) -- github.com/davecgh/go-spew/spew [ISC LICENSE](https://github.com/davecgh/go-spew/blob/master/LICENSE) - github.com/golang/snappy [BSD LICENSE](https://github.com/golang/snappy/blob/master/LICENSE) -- github.com/hashicorp/go-msgpack [BSD LICENSE](https://github.com/hashicorp/go-msgpack/blob/master/LICENSE) -- github.com/hashicorp/raft [MPL LICENSE](https://github.com/hashicorp/raft/blob/master/LICENSE) -- github.com/hashicorp/raft-boltdb [MOZILLA PUBLIC LICENSE](https://github.com/hashicorp/raft-boltdb/blob/master/LICENSE) - github.com/influxdata/usage-client [MIT LICENSE](https://github.com/influxdata/usage-client/blob/master/LICENSE.txt) - github.com/jwilder/encoding [MIT LICENSE](https://github.com/jwilder/encoding/blob/master/LICENSE) - github.com/kimor79/gollectd [BSD LICENSE](https://github.com/kimor79/gollectd/blob/master/LICENSE) - github.com/paulbellamy/ratecounter [MIT LICENSE](https://github.com/paulbellamy/ratecounter/blob/master/LICENSE) - github.com/peterh/liner [MIT LICENSE](https://github.com/peterh/liner/blob/master/COPYING) - github.com/rakyll/statik [APACHE LICENSE](https://github.com/rakyll/statik/blob/master/LICENSE) +- github.com/retailnext/hllpp [BSD LICENSE](https://github.com/retailnext/hllpp/blob/master/LICENSE) - glyphicons [LICENSE](http://glyphicons.com/license/) - golang.org/x/crypto [BSD LICENSE](https://github.com/golang/crypto/blob/master/LICENSE) -- golang.org/x/tools [BSD LICENSE](https://github.com/golang/tools/blob/master/LICENSE) -- gopkg.in/fatih/pool.v2 [MIT LICENSE](https://github.com/fatih/pool/blob/v2.0.0/LICENSE) - jquery 2.1.4 [MIT LICENSE](https://github.com/jquery/jquery/blob/master/LICENSE.txt) - react 0.13.3 [BSD LICENSE](https://github.com/facebook/react/blob/master/LICENSE) diff --git a/vendor/github.com/influxdata/influxdb/client/README.md b/vendor/github.com/influxdata/influxdb/client/README.md index bd746f53ed..e55c43657f 100644 --- a/vendor/github.com/influxdata/influxdb/client/README.md +++ b/vendor/github.com/influxdata/influxdb/client/README.md @@ -26,7 +26,7 @@ Though not necessary for experimentation, you may want to create a new user and authenticate the connection to your database. For more information please check out the -[Admin Docs](https://docs.influxdata.com/influxdb/v0.10/administration). +[Admin Docs](https://docs.influxdata.com/influxdb/latest/administration/). For the impatient, you can create a new admin user _bubba_ by firing off the [InfluxDB CLI](https://github.com/influxdata/influxdb/blob/master/cmd/influx/main.go). @@ -49,10 +49,8 @@ the configuration below. package main import ( - "net/url" - "fmt" "log" - "os" + "time" "github.com/influxdata/influxdb/client/v2" ) @@ -70,17 +68,17 @@ func main() { Username: username, Password: password, }) - + if err != nil { log.Fatalln("Error: ", err) } - + // Create a new point batch bp, err := client.NewBatchPoints(client.BatchPointsConfig{ Database: MyDB, Precision: "s", }) - + if err != nil { log.Fatalln("Error: ", err) } @@ -93,11 +91,11 @@ func main() { "user": 46.6, } pt, err := client.NewPoint("cpu_usage", tags, fields, time.Now()) - + if err != nil { log.Fatalln("Error: ", err) } - + bp.AddPoint(pt) // Write the batch @@ -257,6 +255,28 @@ func WriteUDP() { } ``` +### Point Splitting + +The UDP client now supports splitting single points that exceed the configured +payload size. The logic for processing each point is listed here, starting with +an empty payload. + +1. If adding the point to the current (non-empty) payload would exceed the + configured size, send the current payload. Otherwise, add it to the current + payload. +1. If the point is smaller than the configured size, add it to the payload. +1. If the point has no timestamp, just try to send the entire point as a single + UDP payload, and process the next point. +1. Since the point has a timestamp, re-use the existing measurement name, + tagset, and timestamp and create multiple new points by splitting up the + fields. The per-point length will be kept close to the configured size, + staying under it if possible. This does mean that one large field, maybe a + long string, could be sent as a larger-than-configured payload. + +The above logic attempts to respect configured payload sizes, but not sacrifice +any data integrity. Points without a timestamp can't be split, as that may +cause fields to have differing timestamps when processed by the server. + ## Go Docs Please refer to diff --git a/vendor/github.com/influxdata/influxdb/client/influxdb.go b/vendor/github.com/influxdata/influxdb/client/influxdb.go index 84ec8e2776..c7f714f04a 100644 --- a/vendor/github.com/influxdata/influxdb/client/influxdb.go +++ b/vendor/github.com/influxdata/influxdb/client/influxdb.go @@ -181,7 +181,7 @@ func (c *Client) Query(q Query) (*Response, error) { } u.RawQuery = values.Encode() - req, err := http.NewRequest("GET", u.String(), nil) + req, err := http.NewRequest("POST", u.String(), nil) if err != nil { return nil, err } @@ -387,22 +387,31 @@ func (c *Client) Ping() (time.Duration, string, error) { // Structs +// Message represents a user message. +type Message struct { + Level string `json:"level,omitempty"` + Text string `json:"text,omitempty"` +} + // Result represents a resultset returned from a single statement. type Result struct { - Series []models.Row - Err error + Series []models.Row + Messages []*Message + Err error } // MarshalJSON encodes the result into JSON. func (r *Result) MarshalJSON() ([]byte, error) { // Define a struct that outputs "error" as a string. var o struct { - Series []models.Row `json:"series,omitempty"` - Err string `json:"error,omitempty"` + Series []models.Row `json:"series,omitempty"` + Messages []*Message `json:"messages,omitempty"` + Err string `json:"error,omitempty"` } // Copy fields to output struct. o.Series = r.Series + o.Messages = r.Messages if r.Err != nil { o.Err = r.Err.Error() } @@ -413,8 +422,9 @@ func (r *Result) MarshalJSON() ([]byte, error) { // UnmarshalJSON decodes the data into the Result struct func (r *Result) UnmarshalJSON(b []byte) error { var o struct { - Series []models.Row `json:"series,omitempty"` - Err string `json:"error,omitempty"` + Series []models.Row `json:"series,omitempty"` + Messages []*Message `json:"messages,omitempty"` + Err string `json:"error,omitempty"` } dec := json.NewDecoder(bytes.NewBuffer(b)) @@ -424,6 +434,7 @@ func (r *Result) UnmarshalJSON(b []byte) error { return err } r.Series = o.Series + r.Messages = o.Messages if o.Err != "" { r.Err = errors.New(o.Err) } @@ -487,17 +498,36 @@ func (r *Response) Error() error { return nil } +// duplexReader reads responses and writes it to another writer while +// satisfying the reader interface. +type duplexReader struct { + r io.Reader + w io.Writer +} + +func (r *duplexReader) Read(p []byte) (n int, err error) { + n, err = r.r.Read(p) + if err == nil { + r.w.Write(p[:n]) + } + return n, err +} + // ChunkedResponse represents a response from the server that // uses chunking to stream the output. type ChunkedResponse struct { - dec *json.Decoder + dec *json.Decoder + duplex *duplexReader + buf bytes.Buffer } // NewChunkedResponse reads a stream and produces responses from the stream. func NewChunkedResponse(r io.Reader) *ChunkedResponse { - dec := json.NewDecoder(r) - dec.UseNumber() - return &ChunkedResponse{dec: dec} + resp := &ChunkedResponse{} + resp.duplex = &duplexReader{r: r, w: &resp.buf} + resp.dec = json.NewDecoder(resp.duplex) + resp.dec.UseNumber() + return resp } // NextResponse reads the next line of the stream and returns a response. @@ -507,8 +537,13 @@ func (r *ChunkedResponse) NextResponse() (*Response, error) { if err == io.EOF { return nil, nil } - return nil, err + // A decoding error happened. This probably means the server crashed + // and sent a last-ditch error message to us. Ensure we have read the + // entirety of the connection to get any remaining error text. + io.Copy(ioutil.Discard, r.duplex) + return nil, errors.New(strings.TrimSpace(r.buf.String())) } + r.buf.Reset() return &response, nil } @@ -551,7 +586,7 @@ func (p *Point) MarshalJSON() ([]byte, error) { // MarshalString renders string representation of a Point with specified // precision. The default precision is nanoseconds. func (p *Point) MarshalString() string { - pt, err := models.NewPoint(p.Measurement, p.Tags, p.Fields, p.Time) + pt, err := models.NewPoint(p.Measurement, models.NewTags(p.Tags), p.Fields, p.Time) if err != nil { return "# ERROR: " + err.Error() + " " + p.Measurement } diff --git a/vendor/github.com/influxdata/influxdb/client/v2/client.go b/vendor/github.com/influxdata/influxdb/client/v2/client.go new file mode 100644 index 0000000000..7ada440afc --- /dev/null +++ b/vendor/github.com/influxdata/influxdb/client/v2/client.go @@ -0,0 +1,501 @@ +package client + +import ( + "bytes" + "crypto/tls" + "encoding/json" + "errors" + "fmt" + "io/ioutil" + "net/http" + "net/url" + "time" + + "github.com/influxdata/influxdb/models" +) + +// HTTPConfig is the config data needed to create an HTTP Client +type HTTPConfig struct { + // Addr should be of the form "http://host:port" + // or "http://[ipv6-host%zone]:port". + Addr string + + // Username is the influxdb username, optional + Username string + + // Password is the influxdb password, optional + Password string + + // UserAgent is the http User Agent, defaults to "InfluxDBClient" + UserAgent string + + // Timeout for influxdb writes, defaults to no timeout + Timeout time.Duration + + // InsecureSkipVerify gets passed to the http client, if true, it will + // skip https certificate verification. Defaults to false + InsecureSkipVerify bool + + // TLSConfig allows the user to set their own TLS config for the HTTP + // Client. If set, this option overrides InsecureSkipVerify. + TLSConfig *tls.Config +} + +// BatchPointsConfig is the config data needed to create an instance of the BatchPoints struct +type BatchPointsConfig struct { + // Precision is the write precision of the points, defaults to "ns" + Precision string + + // Database is the database to write points to + Database string + + // RetentionPolicy is the retention policy of the points + RetentionPolicy string + + // Write consistency is the number of servers required to confirm write + WriteConsistency string +} + +// Client is a client interface for writing & querying the database +type Client interface { + // Ping checks that status of cluster, and will always return 0 time and no + // error for UDP clients + Ping(timeout time.Duration) (time.Duration, string, error) + + // Write takes a BatchPoints object and writes all Points to InfluxDB. + Write(bp BatchPoints) error + + // Query makes an InfluxDB Query on the database. This will fail if using + // the UDP client. + Query(q Query) (*Response, error) + + // Close releases any resources a Client may be using. + Close() error +} + +// NewHTTPClient returns a new Client from the provided config. +// Client is safe for concurrent use by multiple goroutines. +func NewHTTPClient(conf HTTPConfig) (Client, error) { + if conf.UserAgent == "" { + conf.UserAgent = "InfluxDBClient" + } + + u, err := url.Parse(conf.Addr) + if err != nil { + return nil, err + } else if u.Scheme != "http" && u.Scheme != "https" { + m := fmt.Sprintf("Unsupported protocol scheme: %s, your address"+ + " must start with http:// or https://", u.Scheme) + return nil, errors.New(m) + } + + tr := &http.Transport{ + TLSClientConfig: &tls.Config{ + InsecureSkipVerify: conf.InsecureSkipVerify, + }, + } + if conf.TLSConfig != nil { + tr.TLSClientConfig = conf.TLSConfig + } + return &client{ + url: *u, + username: conf.Username, + password: conf.Password, + useragent: conf.UserAgent, + httpClient: &http.Client{ + Timeout: conf.Timeout, + Transport: tr, + }, + transport: tr, + }, nil +} + +// Ping will check to see if the server is up with an optional timeout on waiting for leader. +// Ping returns how long the request took, the version of the server it connected to, and an error if one occurred. +func (c *client) Ping(timeout time.Duration) (time.Duration, string, error) { + now := time.Now() + u := c.url + u.Path = "ping" + + req, err := http.NewRequest("GET", u.String(), nil) + if err != nil { + return 0, "", err + } + + req.Header.Set("User-Agent", c.useragent) + + if c.username != "" { + req.SetBasicAuth(c.username, c.password) + } + + if timeout > 0 { + params := req.URL.Query() + params.Set("wait_for_leader", fmt.Sprintf("%.0fs", timeout.Seconds())) + req.URL.RawQuery = params.Encode() + } + + resp, err := c.httpClient.Do(req) + if err != nil { + return 0, "", err + } + defer resp.Body.Close() + + body, err := ioutil.ReadAll(resp.Body) + if err != nil { + return 0, "", err + } + + if resp.StatusCode != http.StatusNoContent { + var err = fmt.Errorf(string(body)) + return 0, "", err + } + + version := resp.Header.Get("X-Influxdb-Version") + return time.Since(now), version, nil +} + +// Close releases the client's resources. +func (c *client) Close() error { + c.transport.CloseIdleConnections() + return nil +} + +// client is safe for concurrent use as the fields are all read-only +// once the client is instantiated. +type client struct { + // N.B - if url.UserInfo is accessed in future modifications to the + // methods on client, you will need to syncronise access to url. + url url.URL + username string + password string + useragent string + httpClient *http.Client + transport *http.Transport +} + +// BatchPoints is an interface into a batched grouping of points to write into +// InfluxDB together. BatchPoints is NOT thread-safe, you must create a separate +// batch for each goroutine. +type BatchPoints interface { + // AddPoint adds the given point to the Batch of points + AddPoint(p *Point) + // AddPoints adds the given points to the Batch of points + AddPoints(ps []*Point) + // Points lists the points in the Batch + Points() []*Point + + // Precision returns the currently set precision of this Batch + Precision() string + // SetPrecision sets the precision of this batch. + SetPrecision(s string) error + + // Database returns the currently set database of this Batch + Database() string + // SetDatabase sets the database of this Batch + SetDatabase(s string) + + // WriteConsistency returns the currently set write consistency of this Batch + WriteConsistency() string + // SetWriteConsistency sets the write consistency of this Batch + SetWriteConsistency(s string) + + // RetentionPolicy returns the currently set retention policy of this Batch + RetentionPolicy() string + // SetRetentionPolicy sets the retention policy of this Batch + SetRetentionPolicy(s string) +} + +// NewBatchPoints returns a BatchPoints interface based on the given config. +func NewBatchPoints(conf BatchPointsConfig) (BatchPoints, error) { + if conf.Precision == "" { + conf.Precision = "ns" + } + if _, err := time.ParseDuration("1" + conf.Precision); err != nil { + return nil, err + } + bp := &batchpoints{ + database: conf.Database, + precision: conf.Precision, + retentionPolicy: conf.RetentionPolicy, + writeConsistency: conf.WriteConsistency, + } + return bp, nil +} + +type batchpoints struct { + points []*Point + database string + precision string + retentionPolicy string + writeConsistency string +} + +func (bp *batchpoints) AddPoint(p *Point) { + bp.points = append(bp.points, p) +} + +func (bp *batchpoints) AddPoints(ps []*Point) { + bp.points = append(bp.points, ps...) +} + +func (bp *batchpoints) Points() []*Point { + return bp.points +} + +func (bp *batchpoints) Precision() string { + return bp.precision +} + +func (bp *batchpoints) Database() string { + return bp.database +} + +func (bp *batchpoints) WriteConsistency() string { + return bp.writeConsistency +} + +func (bp *batchpoints) RetentionPolicy() string { + return bp.retentionPolicy +} + +func (bp *batchpoints) SetPrecision(p string) error { + if _, err := time.ParseDuration("1" + p); err != nil { + return err + } + bp.precision = p + return nil +} + +func (bp *batchpoints) SetDatabase(db string) { + bp.database = db +} + +func (bp *batchpoints) SetWriteConsistency(wc string) { + bp.writeConsistency = wc +} + +func (bp *batchpoints) SetRetentionPolicy(rp string) { + bp.retentionPolicy = rp +} + +// Point represents a single data point +type Point struct { + pt models.Point +} + +// NewPoint returns a point with the given timestamp. If a timestamp is not +// given, then data is sent to the database without a timestamp, in which case +// the server will assign local time upon reception. NOTE: it is recommended to +// send data with a timestamp. +func NewPoint( + name string, + tags map[string]string, + fields map[string]interface{}, + t ...time.Time, +) (*Point, error) { + var T time.Time + if len(t) > 0 { + T = t[0] + } + + pt, err := models.NewPoint(name, models.NewTags(tags), fields, T) + if err != nil { + return nil, err + } + return &Point{ + pt: pt, + }, nil +} + +// String returns a line-protocol string of the Point +func (p *Point) String() string { + return p.pt.String() +} + +// PrecisionString returns a line-protocol string of the Point, at precision +func (p *Point) PrecisionString(precison string) string { + return p.pt.PrecisionString(precison) +} + +// Name returns the measurement name of the point +func (p *Point) Name() string { + return p.pt.Name() +} + +// Tags returns the tags associated with the point +func (p *Point) Tags() map[string]string { + return p.pt.Tags().Map() +} + +// Time return the timestamp for the point +func (p *Point) Time() time.Time { + return p.pt.Time() +} + +// UnixNano returns the unix nano time of the point +func (p *Point) UnixNano() int64 { + return p.pt.UnixNano() +} + +// Fields returns the fields for the point +func (p *Point) Fields() map[string]interface{} { + return p.pt.Fields() +} + +// NewPointFrom returns a point from the provided models.Point. +func NewPointFrom(pt models.Point) *Point { + return &Point{pt: pt} +} + +func (c *client) Write(bp BatchPoints) error { + var b bytes.Buffer + + for _, p := range bp.Points() { + if _, err := b.WriteString(p.pt.PrecisionString(bp.Precision())); err != nil { + return err + } + + if err := b.WriteByte('\n'); err != nil { + return err + } + } + + u := c.url + u.Path = "write" + req, err := http.NewRequest("POST", u.String(), &b) + if err != nil { + return err + } + req.Header.Set("Content-Type", "") + req.Header.Set("User-Agent", c.useragent) + if c.username != "" { + req.SetBasicAuth(c.username, c.password) + } + + params := req.URL.Query() + params.Set("db", bp.Database()) + params.Set("rp", bp.RetentionPolicy()) + params.Set("precision", bp.Precision()) + params.Set("consistency", bp.WriteConsistency()) + req.URL.RawQuery = params.Encode() + + resp, err := c.httpClient.Do(req) + if err != nil { + return err + } + defer resp.Body.Close() + + body, err := ioutil.ReadAll(resp.Body) + if err != nil { + return err + } + + if resp.StatusCode != http.StatusNoContent && resp.StatusCode != http.StatusOK { + var err = fmt.Errorf(string(body)) + return err + } + + return nil +} + +// Query defines a query to send to the server +type Query struct { + Command string + Database string + Precision string +} + +// NewQuery returns a query object +// database and precision strings can be empty strings if they are not needed +// for the query. +func NewQuery(command, database, precision string) Query { + return Query{ + Command: command, + Database: database, + Precision: precision, + } +} + +// Response represents a list of statement results. +type Response struct { + Results []Result + Err string `json:"error,omitempty"` +} + +// Error returns the first error from any statement. +// Returns nil if no errors occurred on any statements. +func (r *Response) Error() error { + if r.Err != "" { + return fmt.Errorf(r.Err) + } + for _, result := range r.Results { + if result.Err != "" { + return fmt.Errorf(result.Err) + } + } + return nil +} + +// Message represents a user message. +type Message struct { + Level string + Text string +} + +// Result represents a resultset returned from a single statement. +type Result struct { + Series []models.Row + Messages []*Message + Err string `json:"error,omitempty"` +} + +// Query sends a command to the server and returns the Response +func (c *client) Query(q Query) (*Response, error) { + u := c.url + u.Path = "query" + + req, err := http.NewRequest("POST", u.String(), nil) + if err != nil { + return nil, err + } + req.Header.Set("Content-Type", "") + req.Header.Set("User-Agent", c.useragent) + if c.username != "" { + req.SetBasicAuth(c.username, c.password) + } + + params := req.URL.Query() + params.Set("q", q.Command) + params.Set("db", q.Database) + if q.Precision != "" { + params.Set("epoch", q.Precision) + } + req.URL.RawQuery = params.Encode() + + resp, err := c.httpClient.Do(req) + if err != nil { + return nil, err + } + defer resp.Body.Close() + + var response Response + dec := json.NewDecoder(resp.Body) + dec.UseNumber() + decErr := dec.Decode(&response) + + // ignore this error if we got an invalid status code + if decErr != nil && decErr.Error() == "EOF" && resp.StatusCode != http.StatusOK { + decErr = nil + } + // If we got a valid decode error, send that back + if decErr != nil { + return nil, fmt.Errorf("unable to decode json: received status code %d err: %s", resp.StatusCode, decErr) + } + // If we don't have an error in our json response, and didn't get statusOK + // then send back an error + if resp.StatusCode != http.StatusOK && response.Error() == nil { + return &response, fmt.Errorf("received status code %d from server", + resp.StatusCode) + } + return &response, nil +} diff --git a/vendor/github.com/influxdata/influxdb/client/v2/udp.go b/vendor/github.com/influxdata/influxdb/client/v2/udp.go new file mode 100644 index 0000000000..aff07d0f2a --- /dev/null +++ b/vendor/github.com/influxdata/influxdb/client/v2/udp.go @@ -0,0 +1,112 @@ +package client + +import ( + "fmt" + "io" + "net" + "time" +) + +const ( + // UDPPayloadSize is a reasonable default payload size for UDP packets that + // could be travelling over the internet. + UDPPayloadSize = 512 +) + +// UDPConfig is the config data needed to create a UDP Client +type UDPConfig struct { + // Addr should be of the form "host:port" + // or "[ipv6-host%zone]:port". + Addr string + + // PayloadSize is the maximum size of a UDP client message, optional + // Tune this based on your network. Defaults to UDPPayloadSize. + PayloadSize int +} + +// NewUDPClient returns a client interface for writing to an InfluxDB UDP +// service from the given config. +func NewUDPClient(conf UDPConfig) (Client, error) { + var udpAddr *net.UDPAddr + udpAddr, err := net.ResolveUDPAddr("udp", conf.Addr) + if err != nil { + return nil, err + } + + conn, err := net.DialUDP("udp", nil, udpAddr) + if err != nil { + return nil, err + } + + payloadSize := conf.PayloadSize + if payloadSize == 0 { + payloadSize = UDPPayloadSize + } + + return &udpclient{ + conn: conn, + payloadSize: payloadSize, + }, nil +} + +// Close releases the udpclient's resources. +func (uc *udpclient) Close() error { + return uc.conn.Close() +} + +type udpclient struct { + conn io.WriteCloser + payloadSize int +} + +func (uc *udpclient) Write(bp BatchPoints) error { + var b = make([]byte, 0, uc.payloadSize) // initial buffer size, it will grow as needed + var d, _ = time.ParseDuration("1" + bp.Precision()) + + var delayedError error + + var checkBuffer = func(n int) { + if len(b) > 0 && len(b)+n > uc.payloadSize { + if _, err := uc.conn.Write(b); err != nil { + delayedError = err + } + b = b[:0] + } + } + + for _, p := range bp.Points() { + p.pt.Round(d) + pointSize := p.pt.StringSize() + 1 // include newline in size + //point := p.pt.RoundedString(d) + "\n" + + checkBuffer(pointSize) + + if p.Time().IsZero() || pointSize <= uc.payloadSize { + b = p.pt.AppendString(b) + b = append(b, '\n') + continue + } + + points := p.pt.Split(uc.payloadSize - 1) // account for newline character + for _, sp := range points { + checkBuffer(sp.StringSize() + 1) + b = sp.AppendString(b) + b = append(b, '\n') + } + } + + if len(b) > 0 { + if _, err := uc.conn.Write(b); err != nil { + return err + } + } + return delayedError +} + +func (uc *udpclient) Query(q Query) (*Response, error) { + return nil, fmt.Errorf("Querying via UDP is not supported") +} + +func (uc *udpclient) Ping(timeout time.Duration) (time.Duration, string, error) { + return 0, "", nil +} diff --git a/vendor/github.com/influxdata/influxdb/models/inline_fnv.go b/vendor/github.com/influxdata/influxdb/models/inline_fnv.go new file mode 100644 index 0000000000..9391946d82 --- /dev/null +++ b/vendor/github.com/influxdata/influxdb/models/inline_fnv.go @@ -0,0 +1,27 @@ +package models + +// from stdlib hash/fnv/fnv.go +const ( + prime64 = 1099511628211 + offset64 = 14695981039346656037 +) + +// InlineFNV64a is an alloc-free port of the standard library's fnv64a. +type InlineFNV64a uint64 + +func NewInlineFNV64a() InlineFNV64a { + return offset64 +} + +func (s *InlineFNV64a) Write(data []byte) (int, error) { + hash := uint64(*s) + for _, c := range data { + hash ^= uint64(c) + hash *= prime64 + } + *s = InlineFNV64a(hash) + return len(data), nil +} +func (s *InlineFNV64a) Sum64() uint64 { + return uint64(*s) +} diff --git a/vendor/github.com/influxdata/influxdb/models/inline_strconv_parse.go b/vendor/github.com/influxdata/influxdb/models/inline_strconv_parse.go new file mode 100644 index 0000000000..727ce35802 --- /dev/null +++ b/vendor/github.com/influxdata/influxdb/models/inline_strconv_parse.go @@ -0,0 +1,38 @@ +package models + +import ( + "reflect" + "strconv" + "unsafe" +) + +// parseIntBytes is a zero-alloc wrapper around strconv.ParseInt. +func parseIntBytes(b []byte, base int, bitSize int) (i int64, err error) { + s := unsafeBytesToString(b) + return strconv.ParseInt(s, base, bitSize) +} + +// parseFloatBytes is a zero-alloc wrapper around strconv.ParseFloat. +func parseFloatBytes(b []byte, bitSize int) (float64, error) { + s := unsafeBytesToString(b) + return strconv.ParseFloat(s, bitSize) +} + +// parseBoolBytes is a zero-alloc wrapper around strconv.ParseBool. +func parseBoolBytes(b []byte) (bool, error) { + return strconv.ParseBool(unsafeBytesToString(b)) +} + +// unsafeBytesToString converts a []byte to a string without a heap allocation. +// +// It is unsafe, and is intended to prepare input to short-lived functions +// that require strings. +func unsafeBytesToString(in []byte) string { + src := *(*reflect.SliceHeader)(unsafe.Pointer(&in)) + dst := reflect.StringHeader{ + Data: src.Data, + Len: src.Len, + } + s := *(*string)(unsafe.Pointer(&dst)) + return s +} diff --git a/vendor/github.com/influxdata/influxdb/models/points.go b/vendor/github.com/influxdata/influxdb/models/points.go index 53efd9fceb..a4104b8a3f 100644 --- a/vendor/github.com/influxdata/influxdb/models/points.go +++ b/vendor/github.com/influxdata/influxdb/models/points.go @@ -5,7 +5,6 @@ import ( "encoding/binary" "errors" "fmt" - "hash/fnv" "math" "sort" "strconv" @@ -29,6 +28,7 @@ var ( ErrPointMustHaveAField = errors.New("point without fields is unsupported") ErrInvalidNumber = errors.New("invalid number") + ErrInvalidPoint = errors.New("point is invalid") ErrMaxKeyLengthExceeded = errors.New("max key length exceeded") ) @@ -74,6 +74,48 @@ type Point interface { // is a timestamp associated with the point, then it will be rounded to the // given duration RoundedString(d time.Duration) string + + // Split will attempt to return multiple points with the same timestamp whose + // string representations are no longer than size. Points with a single field or + // a point without a timestamp may exceed the requested size. + Split(size int) []Point + + // Round will round the timestamp of the point to the given duration + Round(d time.Duration) + + // StringSize returns the length of the string that would be returned by String() + StringSize() int + + // AppendString appends the result of String() to the provided buffer and returns + // the result, potentially reducing string allocations + AppendString(buf []byte) []byte + + // FieldIterator retuns a FieldIterator that can be used to traverse the + // fields of a point without constructing the in-memory map + FieldIterator() FieldIterator +} + +type FieldType int + +const ( + Integer FieldType = iota + Float + Boolean + String + Empty +) + +type FieldIterator interface { + Next() bool + FieldKey() []byte + Type() FieldType + StringValue() string + IntegerValue() int64 + BooleanValue() bool + FloatValue() float64 + + Delete() + Reset() } // Points represents a sortable list of points by timestamp. @@ -106,6 +148,11 @@ type point struct { // cached version of parsed name from key cachedName string + + // cached version of parsed tags + cachedTags Tags + + it fieldIterator } const ( @@ -138,27 +185,35 @@ func ParsePointsString(buf string) ([]Point, error) { } // ParseKey returns the measurement name and tags from a point. -func ParseKey(buf string) (string, Tags, error) { - _, keyBuf, err := scanKey([]byte(buf), 0) - tags := parseTags([]byte(buf)) - return string(keyBuf), tags, err +func ParseKey(buf []byte) (string, Tags, error) { + // Ignore the error because scanMeasurement returns "missing fields" which we ignore + // when just parsing a key + state, i, _ := scanMeasurement(buf, 0) + + var tags Tags + if state == tagKeyState { + tags = parseTags(buf) + // scanMeasurement returns the location of the comma if there are tags, strip that off + return string(buf[:i-1]), tags, nil + } + return string(buf[:i]), tags, nil } // ParsePointsWithPrecision is similar to ParsePoints, but allows the // caller to provide a precision for time. func ParsePointsWithPrecision(buf []byte, defaultTime time.Time, precision string) ([]Point, error) { - points := []Point{} + points := make([]Point, 0, bytes.Count(buf, []byte{'\n'})+1) var ( pos int block []byte failed []string ) - for { + for pos < len(buf) { pos, block = scanLine(buf, pos) pos++ if len(block) == 0 { - break + continue } // lines which start with '#' are comments @@ -178,17 +233,13 @@ func ParsePointsWithPrecision(buf []byte, defaultTime time.Time, precision strin block = block[:len(block)-1] } - pt, err := parsePoint(block[start:len(block)], defaultTime, precision) + pt, err := parsePoint(block[start:], defaultTime, precision) if err != nil { failed = append(failed, fmt.Sprintf("unable to parse '%s': %v", string(block[start:len(block)]), err)) } else { points = append(points, pt) } - if pos >= len(buf) { - break - } - } if len(failed) > 0 { return points, fmt.Errorf("%s", strings.Join(failed, "\n")) @@ -226,7 +277,6 @@ func parsePoint(buf []byte, defaultTime time.Time, precision string) (Point, err // scan the last block which is an optional integer timestamp pos, ts, err := scanTime(buf, pos) - if err != nil { return nil, err } @@ -241,7 +291,7 @@ func parsePoint(buf []byte, defaultTime time.Time, precision string) (Point, err pt.time = defaultTime pt.SetPrecision(precision) } else { - ts, err := strconv.ParseInt(string(ts), 10, 64) + ts, err := parseIntBytes(ts, 10, 64) if err != nil { return nil, err } @@ -249,6 +299,15 @@ func parsePoint(buf []byte, defaultTime time.Time, precision string) (Point, err if err != nil { return nil, err } + + // Determine if there are illegal non-whitespace characters after the + // timestamp block. + for pos < len(buf) { + if buf[pos] != ' ' { + return nil, ErrInvalidPoint + } + pos++ + } } return pt, nil } @@ -307,24 +366,24 @@ func scanKey(buf []byte, i int) (int, []byte, error) { } } - // Now we know where the key region is within buf, and the locations of tags, we - // need to determine if duplicate tags exist and if the tags are sorted. This iterates - // 1/2 of the list comparing each end with each other, walking towards the center from - // both sides. - for j := 0; j < commas/2; j++ { + // Now we know where the key region is within buf, and the location of tags, we + // need to determine if duplicate tags exist and if the tags are sorted. This iterates + // over the list comparing each tag in the sequence with each other. + for j := 0; j < commas-1; j++ { // get the left and right tags _, left := scanTo(buf[indices[j]:indices[j+1]-1], 0, '=') - _, right := scanTo(buf[indices[commas-j-1]:indices[commas-j]-1], 0, '=') + _, right := scanTo(buf[indices[j+1]:indices[j+2]-1], 0, '=') - // If the tags are equal, then there are duplicate tags, and we should abort - if bytes.Equal(left, right) { - return i, buf[start:i], fmt.Errorf("duplicate tags") - } - - // If left is greater than right, the tags are not sorted. We must continue - // since their could be duplicate tags still. - if bytes.Compare(left, right) > 0 { + // If left is greater than right, the tags are not sorted. We do not have to + // continue because the short path no longer works. + // If the tags are equal, then there are duplicate tags, and we should abort. + // If the tags are not sorted, this pass may not find duplicate tags and we + // need to do a more exhaustive search later. + if cmp := bytes.Compare(left, right); cmp > 0 { sorted = false + break + } else if cmp == 0 { + return i, buf[start:i], fmt.Errorf("duplicate tags") } } @@ -350,6 +409,20 @@ func scanKey(buf []byte, i int) (int, []byte, error) { pos += copy(b[pos:], v) } + // Check again for duplicate tags now that the tags are sorted. + for j := 0; j < commas-1; j++ { + // get the left and right tags + _, left := scanTo(buf[indices[j]:], 0, '=') + _, right := scanTo(buf[indices[j+1]:], 0, '=') + + // If the tags are equal, then there are duplicate tags, and we should abort. + // If the tags are not sorted, this pass may not find duplicate tags and we + // need to do a more exhaustive search later. + if bytes.Equal(left, right) { + return i, b, fmt.Errorf("duplicate tags") + } + } + return i, b, nil } @@ -370,7 +443,7 @@ func scanMeasurement(buf []byte, i int) (int, int, error) { // Check first byte of measurement, anything except a comma is fine. // It can't be a space, since whitespace is stripped prior to this // function call. - if buf[i] == ',' { + if i >= len(buf) || buf[i] == ',' { return -1, i, fmt.Errorf("missing measurement") } @@ -517,15 +590,6 @@ func less(buf []byte, indices []int, i, j int) bool { return bytes.Compare(a, b) < 0 } -func isFieldEscapeChar(b byte) bool { - for c := range escape.Codes { - if c == b { - return true - } - } - return false -} - // scanFields scans buf, starting at i for the fields section of a point. It returns // the ending position and the byte slice of the fields within buf func scanFields(buf []byte, i int) (int, []byte, error) { @@ -626,32 +690,34 @@ func scanFields(buf []byte, i int) (int, []byte, error) { return i, buf[start:i], nil } -// scanTime scans buf, starting at i for the time section of a point. It returns -// the ending position and the byte slice of the fields within buf and error if the -// timestamp is not in the correct numeric format +// scanTime scans buf, starting at i for the time section of a point. It +// returns the ending position and the byte slice of the timestamp within buf +// and and error if the timestamp is not in the correct numeric format. func scanTime(buf []byte, i int) (int, []byte, error) { start := skipWhitespace(buf, i) i = start + for { // reached the end of buf? if i >= len(buf) { break } - // Timestamps should be integers, make sure they are so we don't need to actually - // parse the timestamp until needed - if buf[i] < '0' || buf[i] > '9' { - // Handle negative timestamps - if i == start && buf[i] == '-' { - i++ - continue - } - return i, buf[start:i], fmt.Errorf("bad timestamp") + // Reached end of block or trailing whitespace? + if buf[i] == '\n' || buf[i] == ' ' { + break } - // reached end of block? - if buf[i] == '\n' { - break + // Handle negative timestamps + if i == start && buf[i] == '-' { + i++ + continue + } + + // Timestamps should be integers, make sure they are so we don't need + // to actually parse the timestamp until needed. + if buf[i] < '0' || buf[i] > '9' { + return i, buf[start:i], fmt.Errorf("bad timestamp") } i++ } @@ -762,14 +828,14 @@ func scanNumber(buf []byte, i int) (int, error) { // Parse the int to check bounds the number of digits could be larger than the max range // We subtract 1 from the index to remove the `i` from our tests if len(buf[start:i-1]) >= maxInt64Digits || len(buf[start:i-1]) >= minInt64Digits { - if _, err := strconv.ParseInt(string(buf[start:i-1]), 10, 64); err != nil { + if _, err := parseIntBytes(buf[start:i-1], 10, 64); err != nil { return i, fmt.Errorf("unable to parse integer %s: %s", buf[start:i-1], err) } } } else { // Parse the float to check bounds if it's scientific or the number of digits could be larger than the max range if scientific || len(buf[start:i]) >= maxFloat64Digits || len(buf[start:i]) >= minFloat64Digits { - if _, err := strconv.ParseFloat(string(buf[start:i]), 10); err != nil { + if _, err := parseFloatBytes(buf[start:i], 10); err != nil { return i, fmt.Errorf("invalid float") } } @@ -971,13 +1037,9 @@ func scanTagValue(buf []byte, i int) (int, []byte) { func scanFieldValue(buf []byte, i int) (int, []byte) { start := i quoted := false - for { - if i >= len(buf) { - break - } - - // Only escape char for a field value is a double-quote - if buf[i] == '\\' && i+1 < len(buf) && buf[i+1] == '"' { + for i < len(buf) { + // Only escape char for a field value is a double-quote and backslash + if buf[i] == '\\' && i+1 < len(buf) && (buf[i+1] == '"' || buf[i+1] == '\\') { i += 2 continue } @@ -1021,6 +1083,10 @@ func escapeTag(in []byte) []byte { } func unescapeTag(in []byte) []byte { + if bytes.IndexByte(in, '\\') == -1 { + return in + } + for b, esc := range tagEscapeCodes { if bytes.IndexByte(in, b) != -1 { in = bytes.Replace(in, esc, []byte{b}, -1) @@ -1029,9 +1095,9 @@ func unescapeTag(in []byte) []byte { return in } -// escapeStringField returns a copy of in with any double quotes or +// EscapeStringField returns a copy of in with any double quotes or // backslashes with escaped values -func escapeStringField(in string) string { +func EscapeStringField(in string) string { var out []byte i := 0 for { @@ -1062,6 +1128,10 @@ func escapeStringField(in string) string { // unescapeStringField returns a copy of in with any escaped double-quotes // or backslashes unescaped func unescapeStringField(in string) string { + if strings.IndexByte(in, '\\') == -1 { + return in + } + var out []byte i := 0 for { @@ -1089,20 +1159,42 @@ func unescapeStringField(in string) string { // NewPoint returns a new point with the given measurement name, tags, fields and timestamp. If // an unsupported field value (NaN) or out of range time is passed, this function returns an error. -func NewPoint(name string, tags Tags, fields Fields, time time.Time) (Point, error) { +func NewPoint(name string, tags Tags, fields Fields, t time.Time) (Point, error) { + key, err := pointKey(name, tags, fields, t) + if err != nil { + return nil, err + } + + return &point{ + key: key, + time: t, + fields: fields.MarshalBinary(), + }, nil +} + +// pointKey checks some basic requirements for valid points, and returns the +// key, along with an possible error +func pointKey(measurement string, tags Tags, fields Fields, t time.Time) ([]byte, error) { if len(fields) == 0 { return nil, ErrPointMustHaveAField } - if !time.IsZero() { - if err := CheckTime(time); err != nil { + + if !t.IsZero() { + if err := CheckTime(t); err != nil { return nil, err } } for key, value := range fields { - if fv, ok := value.(float64); ok { + switch value := value.(type) { + case float64: // Ensure the caller validates and handles invalid field values - if math.IsNaN(fv) { + if math.IsNaN(value) { + return nil, fmt.Errorf("NaN is an unsupported value for field %s", key) + } + case float32: + // Ensure the caller validates and handles invalid field values + if math.IsNaN(float64(value)) { return nil, fmt.Errorf("NaN is an unsupported value for field %s", key) } } @@ -1111,16 +1203,12 @@ func NewPoint(name string, tags Tags, fields Fields, time time.Time) (Point, err } } - key := MakeKey([]byte(name), tags) + key := MakeKey([]byte(measurement), tags) if len(key) > MaxKeyLength { return nil, fmt.Errorf("max key length exceeded: %v > %v", len(key), MaxKeyLength) } - return &point{ - key: key, - time: time, - fields: fields.MarshalBinary(), - }, nil + return key, nil } // NewPointFromBytes returns a new Point from a marshalled Point. @@ -1187,40 +1275,57 @@ func (p *point) SetTime(t time.Time) { p.time = t } +// Round implements Point.Round +func (p *point) Round(d time.Duration) { + p.time = p.time.Round(d) +} + // Tags returns the tag set for the point func (p *point) Tags() Tags { - return parseTags(p.key) + if p.cachedTags != nil { + return p.cachedTags + } + p.cachedTags = parseTags(p.key) + return p.cachedTags } func parseTags(buf []byte) Tags { - tags := map[string]string{} - - if len(buf) != 0 { - pos, name := scanTo(buf, 0, ',') - - // it's an empyt key, so there are no tags - if len(name) == 0 { - return tags - } - - i := pos + 1 - var key, value []byte - for { - if i >= len(buf) { - break - } - i, key = scanTo(buf, i, '=') - i, value = scanTagValue(buf, i+1) - - if len(value) == 0 { - continue - } - - tags[string(unescapeTag(key))] = string(unescapeTag(value)) - - i++ - } + if len(buf) == 0 { + return nil } + + pos, name := scanTo(buf, 0, ',') + + // it's an empty key, so there are no tags + if len(name) == 0 { + return nil + } + + tags := make(Tags, 0, bytes.Count(buf, []byte(","))) + hasEscape := bytes.IndexByte(buf, '\\') != -1 + + i := pos + 1 + var key, value []byte + for { + if i >= len(buf) { + break + } + i, key = scanTo(buf, i, '=') + i, value = scanTagValue(buf, i+1) + + if len(value) == 0 { + continue + } + + if hasEscape { + tags = append(tags, Tag{Key: unescapeTag(key), Value: unescapeTag(value)}) + } else { + tags = append(tags, Tag{Key: key, Value: value}) + } + + i++ + } + return tags } @@ -1234,12 +1339,15 @@ func MakeKey(name []byte, tags Tags) []byte { // SetTags replaces the tags for the point func (p *point) SetTags(tags Tags) { p.key = MakeKey([]byte(p.Name()), tags) + p.cachedTags = tags } // AddTag adds or replaces a tag value for a point func (p *point) AddTag(key, value string) { tags := p.Tags() - tags[key] = value + tags = append(tags, Tag{Key: []byte(key), Value: []byte(value)}) + sort.Sort(tags) + p.cachedTags = tags p.key = MakeKey([]byte(p.Name()), tags) } @@ -1276,6 +1384,41 @@ func (p *point) String() string { return string(p.Key()) + " " + string(p.fields) + " " + strconv.FormatInt(p.UnixNano(), 10) } +// AppendString implements Point.AppendString +func (p *point) AppendString(buf []byte) []byte { + buf = append(buf, p.key...) + buf = append(buf, ' ') + buf = append(buf, p.fields...) + + if !p.time.IsZero() { + buf = append(buf, ' ') + buf = strconv.AppendInt(buf, p.UnixNano(), 10) + } + + return buf +} + +func (p *point) StringSize() int { + size := len(p.key) + len(p.fields) + 1 + + if !p.time.IsZero() { + digits := 1 // even "0" has one digit + t := p.UnixNano() + if t < 0 { + // account for negative sign, then negate + digits++ + t = -t + } + for t > 9 { // already accounted for one digit + digits++ + t /= 10 + } + size += digits + 1 // digits and a space + } + + return size +} + func (p *point) MarshalBinary() ([]byte, error) { tb, err := p.time.MarshalBinary() if err != nil { @@ -1335,11 +1478,28 @@ func (p *point) RoundedString(d time.Duration) string { } func (p *point) unmarshalBinary() Fields { - return newFieldsFromBinary(p.fields) + iter := p.FieldIterator() + fields := make(Fields, 8) + for iter.Next() { + if len(iter.FieldKey()) == 0 { + continue + } + switch iter.Type() { + case Float: + fields[string(iter.FieldKey())] = iter.FloatValue() + case Integer: + fields[string(iter.FieldKey())] = iter.IntegerValue() + case String: + fields[string(iter.FieldKey())] = iter.StringValue() + case Boolean: + fields[string(iter.FieldKey())] = iter.BooleanValue() + } + } + return fields } func (p *point) HashID() uint64 { - h := fnv.New64a() + h := NewInlineFNV64a() h.Write(p.key) sum := h.Sum64() return sum @@ -1349,50 +1509,176 @@ func (p *point) UnixNano() int64 { return p.Time().UnixNano() } -// Tags represents a mapping between a Point's tag names and their -// values. -type Tags map[string]string +func (p *point) Split(size int) []Point { + if p.time.IsZero() || len(p.String()) <= size { + return []Point{p} + } + + // key string, timestamp string, spaces + size -= len(p.key) + len(strconv.FormatInt(p.time.UnixNano(), 10)) + 2 + + var points []Point + var start, cur int + + for cur < len(p.fields) { + end, _ := scanTo(p.fields, cur, '=') + end, _ = scanFieldValue(p.fields, end+1) + + if cur > start && end-start > size { + points = append(points, &point{ + key: p.key, + time: p.time, + fields: p.fields[start : cur-1], + }) + start = cur + } + + cur = end + 1 + } + + points = append(points, &point{ + key: p.key, + time: p.time, + fields: p.fields[start:], + }) + + return points +} + +// Tag represents a single key/value tag pair. +type Tag struct { + Key []byte + Value []byte +} + +// Tags represents a sorted list of tags. +type Tags []Tag + +// NewTags returns a new Tags from a map. +func NewTags(m map[string]string) Tags { + if len(m) == 0 { + return nil + } + a := make(Tags, 0, len(m)) + for k, v := range m { + a = append(a, Tag{Key: []byte(k), Value: []byte(v)}) + } + sort.Sort(a) + return a +} + +func (a Tags) Len() int { return len(a) } +func (a Tags) Less(i, j int) bool { return bytes.Compare(a[i].Key, a[j].Key) == -1 } +func (a Tags) Swap(i, j int) { a[i], a[j] = a[j], a[i] } + +// Get returns the value for a key. +func (a Tags) Get(key []byte) []byte { + // OPTIMIZE: Use sort.Search if tagset is large. + + for _, t := range a { + if bytes.Equal(t.Key, key) { + return t.Value + } + } + return nil +} + +// GetString returns the string value for a string key. +func (a Tags) GetString(key string) string { + return string(a.Get([]byte(key))) +} + +// Set sets the value for a key. +func (a *Tags) Set(key, value []byte) { + for _, t := range *a { + if bytes.Equal(t.Key, key) { + t.Value = value + return + } + } + *a = append(*a, Tag{Key: key, Value: value}) + sort.Sort(*a) +} + +// SetString sets the string value for a string key. +func (a *Tags) SetString(key, value string) { + a.Set([]byte(key), []byte(value)) +} + +// Delete removes a tag by key. +func (a *Tags) Delete(key []byte) { + for i, t := range *a { + if bytes.Equal(t.Key, key) { + copy((*a)[i:], (*a)[i+1:]) + (*a)[len(*a)-1] = Tag{} + *a = (*a)[:len(*a)-1] + return + } + } +} + +// Map returns a map representation of the tags. +func (a Tags) Map() map[string]string { + m := make(map[string]string, len(a)) + for _, t := range a { + m[string(t.Key)] = string(t.Value) + } + return m +} + +// Merge merges the tags combining the two. If both define a tag with the +// same key, the merged value overwrites the old value. +// A new map is returned. +func (a Tags) Merge(other map[string]string) Tags { + merged := make(map[string]string, len(a)+len(other)) + for _, t := range a { + merged[string(t.Key)] = string(t.Value) + } + for k, v := range other { + merged[k] = v + } + return NewTags(merged) +} // HashKey hashes all of a tag's keys. -func (t Tags) HashKey() []byte { +func (a Tags) HashKey() []byte { // Empty maps marshal to empty bytes. - if len(t) == 0 { + if len(a) == 0 { return nil } - escaped := Tags{} - for k, v := range t { - ek := escapeTag([]byte(k)) - ev := escapeTag([]byte(v)) + escaped := make(Tags, 0, len(a)) + for _, t := range a { + ek := escapeTag(t.Key) + ev := escapeTag(t.Value) if len(ev) > 0 { - escaped[string(ek)] = string(ev) + escaped = append(escaped, Tag{Key: ek, Value: ev}) } } // Extract keys and determine final size. sz := len(escaped) + (len(escaped) * 2) // separators - keys := make([]string, len(escaped)+1) - i := 0 - for k, v := range escaped { - keys[i] = k - i++ - sz += len(k) + len(v) + keys := make([][]byte, len(escaped)+1) + for i, t := range escaped { + keys[i] = t.Key + sz += len(t.Key) + len(t.Value) } - keys = keys[:i] - sort.Strings(keys) + keys = keys[:len(escaped)] + sort.Sort(byteSlices(keys)) + // Generate marshaled bytes. b := make([]byte, sz) buf := b idx := 0 - for _, k := range keys { + for i, k := range keys { buf[idx] = ',' idx++ copy(buf[idx:idx+len(k)], k) idx += len(k) buf[idx] = '=' idx++ - v := escaped[k] + v := escaped[i].Value copy(buf[idx:idx+len(v)], v) idx += len(v) } @@ -1406,162 +1692,225 @@ type Fields map[string]interface{} func parseNumber(val []byte) (interface{}, error) { if val[len(val)-1] == 'i' { val = val[:len(val)-1] - return strconv.ParseInt(string(val), 10, 64) + return parseIntBytes(val, 10, 64) } for i := 0; i < len(val); i++ { // If there is a decimal or an N (NaN), I (Inf), parse as float if val[i] == '.' || val[i] == 'N' || val[i] == 'n' || val[i] == 'I' || val[i] == 'i' || val[i] == 'e' { - return strconv.ParseFloat(string(val), 64) + return parseFloatBytes(val, 64) } if val[i] < '0' && val[i] > '9' { return string(val), nil } } - return strconv.ParseFloat(string(val), 64) + return parseFloatBytes(val, 64) } -func newFieldsFromBinary(buf []byte) Fields { - fields := Fields{} - var ( - i int - name, valueBuf []byte - value interface{} - err error - ) - for { - if i >= len(buf) { - break - } +func (p *point) FieldIterator() FieldIterator { + p.Reset() + return p +} - i, name = scanTo(buf, i, '=') - name = escape.Unescape(name) +type fieldIterator struct { + start, end int + key, keybuf []byte + valueBuf []byte + fieldType FieldType +} - i, valueBuf = scanFieldValue(buf, i+1) - if len(name) > 0 { - if len(valueBuf) == 0 { - fields[string(name)] = nil - continue - } - - // If the first char is a double-quote, then unmarshal as string - if valueBuf[0] == '"' { - value = unescapeStringField(string(valueBuf[1 : len(valueBuf)-1])) - // Check for numeric characters and special NaN or Inf - } else if (valueBuf[0] >= '0' && valueBuf[0] <= '9') || valueBuf[0] == '-' || valueBuf[0] == '.' || - valueBuf[0] == 'N' || valueBuf[0] == 'n' || // NaN - valueBuf[0] == 'I' || valueBuf[0] == 'i' { // Inf - - value, err = parseNumber(valueBuf) - if err != nil { - panic(fmt.Sprintf("unable to parse number value '%v': %v", string(valueBuf), err)) - } - - // Otherwise parse it as bool - } else { - value, err = strconv.ParseBool(string(valueBuf)) - if err != nil { - panic(fmt.Sprintf("unable to parse bool value '%v': %v\n", string(valueBuf), err)) - } - } - fields[string(name)] = value - } - i++ +func (p *point) Next() bool { + p.it.start = p.it.end + if p.it.start >= len(p.fields) { + return false } - return fields + + p.it.end, p.it.key = scanTo(p.fields, p.it.start, '=') + if escape.IsEscaped(p.it.key) { + p.it.keybuf = escape.AppendUnescaped(p.it.keybuf[:0], p.it.key) + p.it.key = p.it.keybuf + } + + p.it.end, p.it.valueBuf = scanFieldValue(p.fields, p.it.end+1) + p.it.end++ + + if len(p.it.valueBuf) == 0 { + p.it.fieldType = Empty + return true + } + + c := p.it.valueBuf[0] + + if c == '"' { + p.it.fieldType = String + return true + } + + if strings.IndexByte(`0123456789-.nNiI`, c) >= 0 { + if p.it.valueBuf[len(p.it.valueBuf)-1] == 'i' { + p.it.fieldType = Integer + p.it.valueBuf = p.it.valueBuf[:len(p.it.valueBuf)-1] + } else { + p.it.fieldType = Float + } + return true + } + + // to keep the same behavior that currently exists, default to boolean + p.it.fieldType = Boolean + return true +} + +func (p *point) FieldKey() []byte { + return p.it.key +} + +func (p *point) Type() FieldType { + return p.it.fieldType +} + +func (p *point) StringValue() string { + return unescapeStringField(string(p.it.valueBuf[1 : len(p.it.valueBuf)-1])) +} + +func (p *point) IntegerValue() int64 { + n, err := parseIntBytes(p.it.valueBuf, 10, 64) + if err != nil { + panic(fmt.Sprintf("unable to parse integer value %q: %v", p.it.valueBuf, err)) + } + return n +} + +func (p *point) BooleanValue() bool { + b, err := parseBoolBytes(p.it.valueBuf) + if err != nil { + panic(fmt.Sprintf("unable to parse bool value %q: %v", p.it.valueBuf, err)) + } + return b +} + +func (p *point) FloatValue() float64 { + f, err := parseFloatBytes(p.it.valueBuf, 64) + if err != nil { + // panic because that's what the non-iterator code does + panic(fmt.Sprintf("unable to parse floating point value %q: %v", p.it.valueBuf, err)) + } + return f +} + +func (p *point) Delete() { + switch { + case p.it.end == p.it.start: + case p.it.end >= len(p.fields): + p.fields = p.fields[:p.it.start] + case p.it.start == 0: + p.fields = p.fields[p.it.end:] + default: + p.fields = append(p.fields[:p.it.start], p.fields[p.it.end:]...) + } + + p.it.end = p.it.start + p.it.key = nil + p.it.valueBuf = nil + p.it.fieldType = Empty +} + +func (p *point) Reset() { + p.it.fieldType = Empty + p.it.key = nil + p.it.valueBuf = nil + p.it.start = 0 + p.it.end = 0 } // MarshalBinary encodes all the fields to their proper type and returns the binary // represenation // NOTE: uint64 is specifically not supported due to potential overflow when we decode // again later to an int64 +// NOTE2: uint is accepted, and may be 64 bits, and is for some reason accepted... func (p Fields) MarshalBinary() []byte { - b := []byte{} - keys := make([]string, len(p)) - i := 0 + var b []byte + keys := make([]string, 0, len(p)) + for k := range p { - keys[i] = k - i++ + keys = append(keys, k) } + + // Not really necessary, can probably be removed. sort.Strings(keys) - for _, k := range keys { - v := p[k] - b = append(b, []byte(escape.String(k))...) - b = append(b, '=') - switch t := v.(type) { - case int: - b = append(b, []byte(strconv.FormatInt(int64(t), 10))...) - b = append(b, 'i') - case int8: - b = append(b, []byte(strconv.FormatInt(int64(t), 10))...) - b = append(b, 'i') - case int16: - b = append(b, []byte(strconv.FormatInt(int64(t), 10))...) - b = append(b, 'i') - case int32: - b = append(b, []byte(strconv.FormatInt(int64(t), 10))...) - b = append(b, 'i') - case int64: - b = append(b, []byte(strconv.FormatInt(t, 10))...) - b = append(b, 'i') - case uint: - b = append(b, []byte(strconv.FormatInt(int64(t), 10))...) - b = append(b, 'i') - case uint8: - b = append(b, []byte(strconv.FormatInt(int64(t), 10))...) - b = append(b, 'i') - case uint16: - b = append(b, []byte(strconv.FormatInt(int64(t), 10))...) - b = append(b, 'i') - case uint32: - b = append(b, []byte(strconv.FormatInt(int64(t), 10))...) - b = append(b, 'i') - case float32: - val := []byte(strconv.FormatFloat(float64(t), 'f', -1, 32)) - b = append(b, val...) - case float64: - val := []byte(strconv.FormatFloat(t, 'f', -1, 64)) - b = append(b, val...) - case bool: - b = append(b, []byte(strconv.FormatBool(t))...) - case []byte: - b = append(b, t...) - case string: - b = append(b, '"') - b = append(b, []byte(escapeStringField(t))...) - b = append(b, '"') - case nil: - // skip - default: - // Can't determine the type, so convert to string - b = append(b, '"') - b = append(b, []byte(escapeStringField(fmt.Sprintf("%v", v)))...) - b = append(b, '"') - + for i, k := range keys { + if i > 0 { + b = append(b, ',') } - b = append(b, ',') - } - if len(b) > 0 { - return b[0 : len(b)-1] + b = appendField(b, k, p[k]) } + return b } -type indexedSlice struct { - indices []int - b []byte +func appendField(b []byte, k string, v interface{}) []byte { + b = append(b, []byte(escape.String(k))...) + b = append(b, '=') + + // check popular types first + switch v := v.(type) { + case float64: + b = strconv.AppendFloat(b, v, 'f', -1, 64) + case int64: + b = strconv.AppendInt(b, v, 10) + b = append(b, 'i') + case string: + b = append(b, '"') + b = append(b, []byte(EscapeStringField(v))...) + b = append(b, '"') + case bool: + b = strconv.AppendBool(b, v) + case int32: + b = strconv.AppendInt(b, int64(v), 10) + b = append(b, 'i') + case int16: + b = strconv.AppendInt(b, int64(v), 10) + b = append(b, 'i') + case int8: + b = strconv.AppendInt(b, int64(v), 10) + b = append(b, 'i') + case int: + b = strconv.AppendInt(b, int64(v), 10) + b = append(b, 'i') + case uint32: + b = strconv.AppendInt(b, int64(v), 10) + b = append(b, 'i') + case uint16: + b = strconv.AppendInt(b, int64(v), 10) + b = append(b, 'i') + case uint8: + b = strconv.AppendInt(b, int64(v), 10) + b = append(b, 'i') + // TODO: 'uint' should be considered just as "dangerous" as a uint64, + // perhaps the value should be checked and capped at MaxInt64? We could + // then include uint64 as an accepted value + case uint: + b = strconv.AppendInt(b, int64(v), 10) + b = append(b, 'i') + case float32: + b = strconv.AppendFloat(b, float64(v), 'f', -1, 32) + case []byte: + b = append(b, v...) + case nil: + // skip + default: + // Can't determine the type, so convert to string + b = append(b, '"') + b = append(b, []byte(EscapeStringField(fmt.Sprintf("%v", v)))...) + b = append(b, '"') + + } + + return b } -func (s *indexedSlice) Less(i, j int) bool { - _, a := scanTo(s.b, s.indices[i], '=') - _, b := scanTo(s.b, s.indices[j], '=') - return bytes.Compare(a, b) < 0 -} +type byteSlices [][]byte -func (s *indexedSlice) Swap(i, j int) { - s.indices[i], s.indices[j] = s.indices[j], s.indices[i] -} - -func (s *indexedSlice) Len() int { - return len(s.indices) -} +func (a byteSlices) Len() int { return len(a) } +func (a byteSlices) Less(i, j int) bool { return bytes.Compare(a[i], a[j]) == -1 } +func (a byteSlices) Swap(i, j int) { a[i], a[j] = a[j], a[i] } diff --git a/vendor/github.com/influxdata/influxdb/models/rows.go b/vendor/github.com/influxdata/influxdb/models/rows.go index 72435f5c70..528b3272b0 100644 --- a/vendor/github.com/influxdata/influxdb/models/rows.go +++ b/vendor/github.com/influxdata/influxdb/models/rows.go @@ -1,7 +1,6 @@ package models import ( - "hash/fnv" "sort" ) @@ -11,7 +10,6 @@ type Row struct { Tags map[string]string `json:"tags,omitempty"` Columns []string `json:"columns,omitempty"` Values [][]interface{} `json:"values,omitempty"` - Err error `json:"err,omitempty"` } // SameSeries returns true if r contains values for the same series as o. @@ -21,7 +19,7 @@ func (r *Row) SameSeries(o *Row) bool { // tagsHash returns a hash of tag key/value pairs. func (r *Row) tagsHash() uint64 { - h := fnv.New64a() + h := NewInlineFNV64a() keys := r.tagsKeys() for _, k := range keys { h.Write([]byte(k)) diff --git a/vendor/github.com/influxdata/influxdb/models/statistic.go b/vendor/github.com/influxdata/influxdb/models/statistic.go new file mode 100644 index 0000000000..33f8089948 --- /dev/null +++ b/vendor/github.com/influxdata/influxdb/models/statistic.go @@ -0,0 +1,40 @@ +package models + +type Statistic struct { + Name string `json:"name"` + Tags map[string]string `json:"tags"` + Values map[string]interface{} `json:"values"` +} + +func NewStatistic(name string) Statistic { + return Statistic{ + Name: name, + Tags: make(map[string]string), + Values: make(map[string]interface{}), + } +} + +// StatisticTags is a map that can be merged with others without causing +// mutations to either map. +type StatisticTags map[string]string + +// Merge creates a new map containing the merged contents of tags and t. +// If both tags and the receiver map contain the same key, the value in tags +// is used in the resulting map. +// +// Merge always returns a usable map. +func (t StatisticTags) Merge(tags map[string]string) map[string]string { + // Add everything in tags to the result. + out := make(map[string]string, len(tags)) + for k, v := range tags { + out[k] = v + } + + // Only add values from t that don't appear in tags. + for k, v := range t { + if _, ok := tags[k]; !ok { + out[k] = v + } + } + return out +} diff --git a/vendor/github.com/influxdata/influxdb/models/time.go b/vendor/github.com/influxdata/influxdb/models/time.go index 9e41577742..e98f2cb336 100644 --- a/vendor/github.com/influxdata/influxdb/models/time.go +++ b/vendor/github.com/influxdata/influxdb/models/time.go @@ -9,14 +9,36 @@ import ( "time" ) +const ( + // MinNanoTime is the minumum time that can be represented. + // + // 1677-09-21 00:12:43.145224194 +0000 UTC + // + // The two lowest minimum integers are used as sentinel values. The + // minimum value needs to be used as a value lower than any other value for + // comparisons and another separate value is needed to act as a sentinel + // default value that is unusable by the user, but usable internally. + // Because these two values need to be used for a special purpose, we do + // not allow users to write points at these two times. + MinNanoTime = int64(math.MinInt64) + 2 + + // MaxNanoTime is the maximum time that can be represented. + // + // 2262-04-11 23:47:16.854775806 +0000 UTC + // + // The highest time represented by a nanosecond needs to be used for an + // exclusive range in the shard group, so the maximum time needs to be one + // less than the possible maximum number of nanoseconds representable by an + // int64 so that we don't lose a point at that one time. + MaxNanoTime = int64(math.MaxInt64) - 1 +) + var ( - // MaxNanoTime is the maximum time that can be represented via int64 nanoseconds since the epoch. - MaxNanoTime = time.Unix(0, math.MaxInt64).UTC() - // MinNanoTime is the minumum time that can be represented via int64 nanoseconds since the epoch. - MinNanoTime = time.Unix(0, math.MinInt64).UTC() + minNanoTime = time.Unix(0, MinNanoTime).UTC() + maxNanoTime = time.Unix(0, MaxNanoTime).UTC() // ErrTimeOutOfRange gets returned when time is out of the representable range using int64 nanoseconds since the epoch. - ErrTimeOutOfRange = fmt.Errorf("time outside range %s - %s", MinNanoTime, MaxNanoTime) + ErrTimeOutOfRange = fmt.Errorf("time outside range %d - %d", MinNanoTime, MaxNanoTime) ) // SafeCalcTime safely calculates the time given. Will return error if the time is outside the @@ -24,7 +46,8 @@ var ( func SafeCalcTime(timestamp int64, precision string) (time.Time, error) { mult := GetPrecisionMultiplier(precision) if t, ok := safeSignedMult(timestamp, mult); ok { - return time.Unix(0, t).UTC(), nil + tme := time.Unix(0, t).UTC() + return tme, CheckTime(tme) } return time.Time{}, ErrTimeOutOfRange @@ -32,7 +55,7 @@ func SafeCalcTime(timestamp int64, precision string) (time.Time, error) { // CheckTime checks that a time is within the safe range. func CheckTime(t time.Time) error { - if t.Before(MinNanoTime) || t.After(MaxNanoTime) { + if t.Before(minNanoTime) || t.After(maxNanoTime) { return ErrTimeOutOfRange } return nil @@ -43,7 +66,7 @@ func safeSignedMult(a, b int64) (int64, bool) { if a == 0 || b == 0 || a == 1 || b == 1 { return a * b, true } - if a == math.MinInt64 || b == math.MaxInt64 { + if a == MinNanoTime || b == MaxNanoTime { return 0, false } c := a * b diff --git a/vendor/github.com/influxdata/influxdb/pkg/escape/bytes.go b/vendor/github.com/influxdata/influxdb/pkg/escape/bytes.go index 15e9cf29d5..5b6ba15aab 100644 --- a/vendor/github.com/influxdata/influxdb/pkg/escape/bytes.go +++ b/vendor/github.com/influxdata/influxdb/pkg/escape/bytes.go @@ -1,6 +1,9 @@ package escape -import "bytes" +import ( + "bytes" + "strings" +) func Bytes(in []byte) []byte { for b, esc := range Codes { @@ -9,7 +12,54 @@ func Bytes(in []byte) []byte { return in } +const escapeChars = `," =` + +func IsEscaped(b []byte) bool { + for len(b) > 0 { + i := bytes.IndexByte(b, '\\') + if i < 0 { + return false + } + + if i+1 < len(b) && strings.IndexByte(escapeChars, b[i+1]) >= 0 { + return true + } + b = b[i+1:] + } + return false +} + +func AppendUnescaped(dst, src []byte) []byte { + var pos int + for len(src) > 0 { + next := bytes.IndexByte(src[pos:], '\\') + if next < 0 || pos+next+1 >= len(src) { + return append(dst, src...) + } + + if pos+next+1 < len(src) && strings.IndexByte(escapeChars, src[pos+next+1]) >= 0 { + if pos+next > 0 { + dst = append(dst, src[:pos+next]...) + } + src = src[pos+next+1:] + pos = 0 + } else { + pos += next + 1 + } + } + + return dst +} + func Unescape(in []byte) []byte { + if len(in) == 0 { + return nil + } + + if bytes.IndexByte(in, '\\') == -1 { + return in + } + i := 0 inLen := len(in) var out []byte diff --git a/vendor/github.com/influxdata/influxdb/pkg/escape/strings.go b/vendor/github.com/influxdata/influxdb/pkg/escape/strings.go index 330fbf4226..d391142869 100644 --- a/vendor/github.com/influxdata/influxdb/pkg/escape/strings.go +++ b/vendor/github.com/influxdata/influxdb/pkg/escape/strings.go @@ -20,6 +20,10 @@ func init() { } func UnescapeString(in string) string { + if strings.IndexByte(in, '\\') == -1 { + return in + } + for b, esc := range codesStr { in = strings.Replace(in, esc, b, -1) }