remote/read_handler: pool input to Marshal() (#11357)

* remote/read_handler: pool input to Marshal()

Use a sync.Pool to reuse byte slices between calls to Marshal() in the
remote read handler.

Signed-off-by: Giedrius Statkevičius <giedrius.statkevicius@vinted.com>

* remote: add microbenchmark for remote read handler

Signed-off-by: Giedrius Statkevičius <giedrius.statkevicius@vinted.com>

Signed-off-by: Giedrius Statkevičius <giedrius.statkevicius@vinted.com>
pull/11593/head
Giedrius Statkevičius 2 years ago committed by GitHub
parent 8553a98267
commit d1d2566055
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

@ -13,5 +13,22 @@
package prompb
import (
"sync"
)
func (m Sample) T() int64 { return m.Timestamp }
func (m Sample) V() float64 { return m.Value }
func (r *ChunkedReadResponse) PooledMarshal(p *sync.Pool) ([]byte, error) {
size := r.Size()
data, ok := p.Get().(*[]byte)
if ok && cap(*data) >= size {
n, err := r.MarshalToSizedBuffer((*data)[:size])
if err != nil {
return nil, err
}
return (*data)[:n], nil
}
return r.Marshal()
}

@ -20,6 +20,7 @@ import (
"net/http"
"sort"
"strings"
"sync"
"github.com/gogo/protobuf/proto"
"github.com/golang/snappy"
@ -193,6 +194,7 @@ func StreamChunkedReadResponses(
ss storage.ChunkSeriesSet,
sortedExternalLabels []prompb.Label,
maxBytesInFrame int,
marshalPool *sync.Pool,
) (storage.Warnings, error) {
var (
chks []prompb.Chunk
@ -234,12 +236,14 @@ func StreamChunkedReadResponses(
continue
}
b, err := proto.Marshal(&prompb.ChunkedReadResponse{
resp := &prompb.ChunkedReadResponse{
ChunkedSeries: []*prompb.ChunkedSeries{
{Labels: lbls, Chunks: chks},
},
QueryIndex: queryIndex,
})
}
b, err := resp.PooledMarshal(marshalPool)
if err != nil {
return ss.Warnings(), fmt.Errorf("marshal ChunkedReadResponse: %w", err)
}
@ -247,6 +251,9 @@ func StreamChunkedReadResponses(
if _, err := stream.Write(b); err != nil {
return ss.Warnings(), fmt.Errorf("write to stream: %w", err)
}
// We immediately flush the Write() so it is safe to return to the pool.
marshalPool.Put(&b)
chks = chks[:0]
}
if err := iter.Err(); err != nil {

@ -17,6 +17,7 @@ import (
"context"
"net/http"
"sort"
"sync"
"github.com/go-kit/log"
"github.com/go-kit/log/level"
@ -37,6 +38,7 @@ type readHandler struct {
remoteReadMaxBytesInFrame int
remoteReadGate *gate.Gate
queries prometheus.Gauge
marshalPool *sync.Pool
}
// NewReadHandler creates a http.Handler that accepts remote read requests and
@ -49,6 +51,7 @@ func NewReadHandler(logger log.Logger, r prometheus.Registerer, queryable storag
remoteReadSampleLimit: remoteReadSampleLimit,
remoteReadGate: gate.New(remoteReadConcurrencyLimit),
remoteReadMaxBytesInFrame: remoteReadMaxBytesInFrame,
marshalPool: &sync.Pool{},
queries: prometheus.NewGauge(prometheus.GaugeOpts{
Namespace: "prometheus",
@ -225,6 +228,7 @@ func (h *readHandler) remoteReadStreamedXORChunks(ctx context.Context, w http.Re
querier.Select(true, hints, filteredMatchers...),
sortedExternalLabels,
h.remoteReadMaxBytesInFrame,
h.marshalPool,
)
if err != nil {
return err

@ -107,6 +107,73 @@ func TestSampledReadEndpoint(t *testing.T) {
}, resp.Results[0])
}
func BenchmarkStreamReadEndpoint(b *testing.B) {
suite, err := promql.NewTest(b, `
load 1m
test_metric1{foo="bar1",baz="qux"} 0+100x119
test_metric1{foo="bar2",baz="qux"} 0+100x120
test_metric1{foo="bar3",baz="qux"} 0+100x240
`)
require.NoError(b, err)
defer suite.Close()
require.NoError(b, suite.Run())
api := NewReadHandler(nil, nil, suite.Storage(), func() config.Config {
return config.Config{}
},
0, 1, 0,
)
matcher, err := labels.NewMatcher(labels.MatchEqual, "__name__", "test_metric1")
require.NoError(b, err)
query, err := ToQuery(0, 14400001, []*labels.Matcher{matcher}, &storage.SelectHints{
Step: 1,
Func: "sum",
Start: 0,
End: 14400001,
})
require.NoError(b, err)
req := &prompb.ReadRequest{
Queries: []*prompb.Query{query},
AcceptedResponseTypes: []prompb.ReadRequest_ResponseType{prompb.ReadRequest_STREAMED_XOR_CHUNKS},
}
data, err := proto.Marshal(req)
require.NoError(b, err)
b.ResetTimer()
b.ReportAllocs()
for i := 0; i < b.N; i++ {
compressed := snappy.Encode(nil, data)
request, err := http.NewRequest("POST", "", bytes.NewBuffer(compressed))
require.NoError(b, err)
recorder := httptest.NewRecorder()
api.ServeHTTP(recorder, request)
require.Equal(b, 2, recorder.Code/100)
var results []*prompb.ChunkedReadResponse
stream := NewChunkedReader(recorder.Result().Body, DefaultChunkedReadLimit, nil)
for {
res := &prompb.ChunkedReadResponse{}
err := stream.NextProto(res)
if err == io.EOF {
break
}
require.NoError(b, err)
results = append(results, res)
}
require.Equal(b, 6, len(results), "Expected 6 results.")
}
}
func TestStreamReadEndpoint(t *testing.T) {
// First with 120 samples. We expect 1 frame with 1 chunk.
// Second with 121 samples, We expect 1 frame with 2 chunks.

Loading…
Cancel
Save