retrieval: handle GZIP compression ourselves

The automatic GZIP handling of net/http does not preserve
buffers across requests and thus generates a lot of garbage.
We handle GZIP ourselves to circumvent this.t
pull/2643/head
Fabian Reinartz 8 years ago
parent 311e7b5069
commit cc0ff26f1f

@ -14,12 +14,16 @@
package retrieval package retrieval
import ( import (
"bufio"
"bytes" "bytes"
"compress/gzip"
"fmt" "fmt"
"io" "io"
"net/http" "net/http"
"reflect"
"sync" "sync"
"time" "time"
"unsafe"
"github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/common/log" "github.com/prometheus/common/log"
@ -347,35 +351,30 @@ type scraper interface {
// targetScraper implements the scraper interface for a target. // targetScraper implements the scraper interface for a target.
type targetScraper struct { type targetScraper struct {
*Target *Target
client *http.Client
}
const acceptHeader = `application/vnd.google.protobuf;proto=io.prometheus.client.MetricFamily;encoding=delimited;q=0.7,text/plain;version=0.0.4;q=0.3,*/*;q=0.1`
var scrapeBufPool = sync.Pool{} client *http.Client
req *http.Request
func getScrapeBuf() []byte { gzipr *gzip.Reader
b := scrapeBufPool.Get() buf *bufio.Reader
if b == nil {
return make([]byte, 0, 8192)
}
return b.([]byte)
} }
func putScrapeBuf(b []byte) { const acceptHeader = `application/vnd.google.protobuf;proto=io.prometheus.client.MetricFamily;encoding=delimited;q=0.7,text/plain;version=0.0.4;q=0.3,*/*;q=0.1`
b = b[:0]
scrapeBufPool.Put(b)
}
func (s *targetScraper) scrape(ctx context.Context, w io.Writer) error { func (s *targetScraper) scrape(ctx context.Context, w io.Writer) error {
if s.req == nil {
req, err := http.NewRequest("GET", s.URL().String(), nil) req, err := http.NewRequest("GET", s.URL().String(), nil)
if err != nil { if err != nil {
return err return err
} }
// Disable accept header to always negotiate for text format. // Disable accept header to always negotiate for text format.
// req.Header.Add("Accept", acceptHeader) // req.Header.Add("Accept", acceptHeader)
req.Header.Add("Accept-Encoding", "gzip")
s.req = req
}
resp, err := ctxhttp.Do(ctx, s.client, req) resp, err := ctxhttp.Do(ctx, s.client, s.req)
if err != nil { if err != nil {
return err return err
} }
@ -385,10 +384,27 @@ func (s *targetScraper) scrape(ctx context.Context, w io.Writer) error {
return fmt.Errorf("server returned HTTP status %s", resp.Status) return fmt.Errorf("server returned HTTP status %s", resp.Status)
} }
if resp.Header.Get("Content-Encoding") != "gzip" {
_, err = io.Copy(w, resp.Body) _, err = io.Copy(w, resp.Body)
return err return err
} }
if s.gzipr == nil {
s.buf = bufio.NewReader(resp.Body)
s.gzipr, err = gzip.NewReader(s.buf)
if err != nil {
return err
}
} else {
s.buf.Reset(resp.Body)
s.gzipr.Reset(s.buf)
}
_, err = io.Copy(w, s.gzipr)
s.gzipr.Close()
return err
}
// A loop can run and be stopped again. It must not be reused after it was stopped. // A loop can run and be stopped again. It must not be reused after it was stopped.
type loop interface { type loop interface {
run(interval, timeout time.Duration, errc chan<- error) run(interval, timeout time.Duration, errc chan<- error)
@ -436,7 +452,10 @@ func (sl *scrapeLoop) run(interval, timeout time.Duration, errc chan<- error) {
ticker := time.NewTicker(interval) ticker := time.NewTicker(interval)
defer ticker.Stop() defer ticker.Stop()
buf := bytes.NewBuffer(make([]byte, 0, 16000))
for { for {
buf.Reset()
select { select {
case <-sl.ctx.Done(): case <-sl.ctx.Done():
return return
@ -456,8 +475,6 @@ func (sl *scrapeLoop) run(interval, timeout time.Duration, errc chan<- error) {
) )
} }
buf := bytes.NewBuffer(getScrapeBuf())
err := sl.scraper.scrape(scrapeCtx, buf) err := sl.scraper.scrape(scrapeCtx, buf)
if err == nil { if err == nil {
b := buf.Bytes() b := buf.Bytes()
@ -465,7 +482,6 @@ func (sl *scrapeLoop) run(interval, timeout time.Duration, errc chan<- error) {
if total, added, err = sl.append(b, start); err != nil { if total, added, err = sl.append(b, start); err != nil {
log.With("err", err).Error("append failed") log.With("err", err).Error("append failed")
} }
putScrapeBuf(b)
} else if errc != nil { } else if errc != nil {
errc <- err errc <- err
} }
@ -524,7 +540,7 @@ loop:
t = *tp t = *tp
} }
mets := string(met) mets := yoloString(met)
ref, ok := sl.cache[mets] ref, ok := sl.cache[mets]
if ok { if ok {
switch err = app.AddFast(ref, t, v); err { switch err = app.AddFast(ref, t, v); err {
@ -550,6 +566,8 @@ loop:
default: default:
break loop break loop
} }
// Allocate a real string.
mets = string(met)
sl.cache[mets] = ref sl.cache[mets] = ref
} }
added++ added++
@ -567,6 +585,16 @@ loop:
return total, added, nil return total, added, nil
} }
func yoloString(b []byte) string {
sh := (*reflect.SliceHeader)(unsafe.Pointer(&b))
h := reflect.StringHeader{
Data: sh.Data,
Len: sh.Len,
}
return *((*string)(unsafe.Pointer(&h)))
}
func (sl *scrapeLoop) report(start time.Time, duration time.Duration, scraped, appended int, err error) error { func (sl *scrapeLoop) report(start time.Time, duration time.Duration, scraped, appended int, err error) error {
sl.scraper.report(start, duration, err) sl.scraper.report(start, duration, err)

@ -81,6 +81,7 @@ func NewHTTPClient(cfg config.HTTPClientConfig) (*http.Client, error) {
Proxy: http.ProxyURL(cfg.ProxyURL.URL), Proxy: http.ProxyURL(cfg.ProxyURL.URL),
MaxIdleConns: 10000, MaxIdleConns: 10000,
TLSClientConfig: tlsConfig, TLSClientConfig: tlsConfig,
DisableCompression: true,
} }
// If a bearer token is provided, create a round tripper that will set the // If a bearer token is provided, create a round tripper that will set the

Loading…
Cancel
Save