mirror of https://github.com/prometheus/prometheus
Compress remote storage requests and responses with unframed/raw snappy. (#2696)
* Compress remote storage requests and responses with unframed/raw snappy, for compatibility with other languages. * Remove backwards compatibility code from remote_storage_adapter, update example_write_adapter * Add /documentation/examples/remote_storage/example_write_adapter/example_writer_adapter to .gitignorepull/2708/head
parent
14d0604aba
commit
3141a6b36b
|
@ -25,3 +25,5 @@ benchmark.txt
|
|||
!/circle.yml
|
||||
!/.travis.yml
|
||||
!/.promu.yml
|
||||
/documentation/examples/remote_storage/remote_storage_adapter/remote_storage_adapter
|
||||
/documentation/examples/remote_storage/example_write_adapter/example_writer_adapter
|
||||
|
|
|
@ -27,7 +27,13 @@ import (
|
|||
|
||||
func main() {
|
||||
http.HandleFunc("/receive", func(w http.ResponseWriter, r *http.Request) {
|
||||
reqBuf, err := ioutil.ReadAll(snappy.NewReader(r.Body))
|
||||
compressed, err := ioutil.ReadAll(r.Body)
|
||||
if err != nil {
|
||||
http.Error(w, err.Error(), http.StatusInternalServerError)
|
||||
return
|
||||
}
|
||||
|
||||
reqBuf, err := snappy.Decode(nil, compressed)
|
||||
if err != nil {
|
||||
http.Error(w, err.Error(), http.StatusBadRequest)
|
||||
return
|
||||
|
|
|
@ -184,7 +184,13 @@ func buildClients(cfg *config) ([]writer, []reader) {
|
|||
|
||||
func serve(addr string, writers []writer, readers []reader) error {
|
||||
http.HandleFunc("/write", func(w http.ResponseWriter, r *http.Request) {
|
||||
reqBuf, err := ioutil.ReadAll(snappy.NewReader(r.Body))
|
||||
compressed, err := ioutil.ReadAll(r.Body)
|
||||
if err != nil {
|
||||
http.Error(w, err.Error(), http.StatusInternalServerError)
|
||||
return
|
||||
}
|
||||
|
||||
reqBuf, err := snappy.Decode(nil, compressed)
|
||||
if err != nil {
|
||||
http.Error(w, err.Error(), http.StatusBadRequest)
|
||||
return
|
||||
|
@ -211,7 +217,13 @@ func serve(addr string, writers []writer, readers []reader) error {
|
|||
})
|
||||
|
||||
http.HandleFunc("/read", func(w http.ResponseWriter, r *http.Request) {
|
||||
reqBuf, err := ioutil.ReadAll(snappy.NewReader(r.Body))
|
||||
compressed, err := ioutil.ReadAll(r.Body)
|
||||
if err != nil {
|
||||
http.Error(w, err.Error(), http.StatusInternalServerError)
|
||||
return
|
||||
}
|
||||
|
||||
reqBuf, err := snappy.Decode(nil, compressed)
|
||||
if err != nil {
|
||||
http.Error(w, err.Error(), http.StatusBadRequest)
|
||||
return
|
||||
|
@ -245,7 +257,10 @@ func serve(addr string, writers []writer, readers []reader) error {
|
|||
}
|
||||
|
||||
w.Header().Set("Content-Type", "application/x-protobuf")
|
||||
if _, err := snappy.NewWriter(w).Write(data); err != nil {
|
||||
w.Header().Set("Content-Encoding", "snappy")
|
||||
|
||||
compressed = snappy.Encode(nil, data)
|
||||
if _, err := w.Write(compressed); err != nil {
|
||||
http.Error(w, err.Error(), http.StatusInternalServerError)
|
||||
return
|
||||
}
|
||||
|
|
|
@ -94,12 +94,8 @@ func (c *Client) Store(samples model.Samples) error {
|
|||
return err
|
||||
}
|
||||
|
||||
buf := bytes.Buffer{}
|
||||
if _, err := snappy.NewWriter(&buf).Write(data); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
httpReq, err := http.NewRequest("POST", c.url.String(), &buf)
|
||||
compressed := snappy.Encode(nil, data)
|
||||
httpReq, err := http.NewRequest("POST", c.url.String(), bytes.NewBuffer(compressed))
|
||||
if err != nil {
|
||||
// Errors from NewRequest are from unparseable URLs, so are not
|
||||
// recoverable.
|
||||
|
@ -107,7 +103,7 @@ func (c *Client) Store(samples model.Samples) error {
|
|||
}
|
||||
httpReq.Header.Add("Content-Encoding", "snappy")
|
||||
httpReq.Header.Set("Content-Type", "application/x-protobuf")
|
||||
httpReq.Header.Set("X-Prometheus-Remote-Write-Version", "0.0.1")
|
||||
httpReq.Header.Set("X-Prometheus-Remote-Write-Version", "0.1.0")
|
||||
|
||||
ctx, cancel := context.WithTimeout(context.Background(), c.timeout)
|
||||
defer cancel()
|
||||
|
@ -151,17 +147,14 @@ func (c *Client) Read(ctx context.Context, from, through model.Time, matchers me
|
|||
return nil, fmt.Errorf("unable to marshal read request: %v", err)
|
||||
}
|
||||
|
||||
buf := bytes.Buffer{}
|
||||
if _, err := snappy.NewWriter(&buf).Write(data); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
httpReq, err := http.NewRequest("POST", c.url.String(), &buf)
|
||||
compressed := snappy.Encode(nil, data)
|
||||
httpReq, err := http.NewRequest("POST", c.url.String(), bytes.NewBuffer(compressed))
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("unable to create request: %v", err)
|
||||
}
|
||||
httpReq.Header.Add("Content-Encoding", "snappy")
|
||||
httpReq.Header.Set("Content-Type", "application/x-protobuf")
|
||||
httpReq.Header.Set("X-Prometheus-Remote-Read-Version", "0.0.1")
|
||||
httpReq.Header.Set("X-Prometheus-Remote-Read-Version", "0.1.0")
|
||||
|
||||
ctx, cancel := context.WithTimeout(ctx, c.timeout)
|
||||
defer cancel()
|
||||
|
@ -175,12 +168,18 @@ func (c *Client) Read(ctx context.Context, from, through model.Time, matchers me
|
|||
return nil, fmt.Errorf("server returned HTTP status %s", httpResp.Status)
|
||||
}
|
||||
|
||||
if data, err = ioutil.ReadAll(snappy.NewReader(httpResp.Body)); err != nil {
|
||||
compressed, err = ioutil.ReadAll(httpResp.Body)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("error reading response: %v", err)
|
||||
}
|
||||
|
||||
uncompressed, err := snappy.Decode(nil, compressed)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("error reading response: %v", err)
|
||||
}
|
||||
|
||||
var resp ReadResponse
|
||||
err = proto.Unmarshal(data, &resp)
|
||||
err = proto.Unmarshal(uncompressed, &resp)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("unable to unmarshal response body: %v", err)
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue