diff --git a/prompb/custom.go b/prompb/custom.go index 0b3820c4d..4b07187bd 100644 --- a/prompb/custom.go +++ b/prompb/custom.go @@ -13,5 +13,22 @@ package prompb +import ( + "sync" +) + func (m Sample) T() int64 { return m.Timestamp } func (m Sample) V() float64 { return m.Value } + +func (r *ChunkedReadResponse) PooledMarshal(p *sync.Pool) ([]byte, error) { + size := r.Size() + data, ok := p.Get().(*[]byte) + if ok && cap(*data) >= size { + n, err := r.MarshalToSizedBuffer((*data)[:size]) + if err != nil { + return nil, err + } + return (*data)[:n], nil + } + return r.Marshal() +} diff --git a/storage/remote/codec.go b/storage/remote/codec.go index 9b6f5a5ab..48c2d8615 100644 --- a/storage/remote/codec.go +++ b/storage/remote/codec.go @@ -20,6 +20,7 @@ import ( "net/http" "sort" "strings" + "sync" "github.com/gogo/protobuf/proto" "github.com/golang/snappy" @@ -193,6 +194,7 @@ func StreamChunkedReadResponses( ss storage.ChunkSeriesSet, sortedExternalLabels []prompb.Label, maxBytesInFrame int, + marshalPool *sync.Pool, ) (storage.Warnings, error) { var ( chks []prompb.Chunk @@ -234,12 +236,14 @@ func StreamChunkedReadResponses( continue } - b, err := proto.Marshal(&prompb.ChunkedReadResponse{ + resp := &prompb.ChunkedReadResponse{ ChunkedSeries: []*prompb.ChunkedSeries{ {Labels: lbls, Chunks: chks}, }, QueryIndex: queryIndex, - }) + } + + b, err := resp.PooledMarshal(marshalPool) if err != nil { return ss.Warnings(), fmt.Errorf("marshal ChunkedReadResponse: %w", err) } @@ -247,6 +251,9 @@ func StreamChunkedReadResponses( if _, err := stream.Write(b); err != nil { return ss.Warnings(), fmt.Errorf("write to stream: %w", err) } + + // We immediately flush the Write() so it is safe to return to the pool. + marshalPool.Put(&b) chks = chks[:0] } if err := iter.Err(); err != nil { diff --git a/storage/remote/read_handler.go b/storage/remote/read_handler.go index e1f1df21c..116eb9596 100644 --- a/storage/remote/read_handler.go +++ b/storage/remote/read_handler.go @@ -17,6 +17,7 @@ import ( "context" "net/http" "sort" + "sync" "github.com/go-kit/log" "github.com/go-kit/log/level" @@ -37,6 +38,7 @@ type readHandler struct { remoteReadMaxBytesInFrame int remoteReadGate *gate.Gate queries prometheus.Gauge + marshalPool *sync.Pool } // NewReadHandler creates a http.Handler that accepts remote read requests and @@ -49,6 +51,7 @@ func NewReadHandler(logger log.Logger, r prometheus.Registerer, queryable storag remoteReadSampleLimit: remoteReadSampleLimit, remoteReadGate: gate.New(remoteReadConcurrencyLimit), remoteReadMaxBytesInFrame: remoteReadMaxBytesInFrame, + marshalPool: &sync.Pool{}, queries: prometheus.NewGauge(prometheus.GaugeOpts{ Namespace: "prometheus", @@ -225,6 +228,7 @@ func (h *readHandler) remoteReadStreamedXORChunks(ctx context.Context, w http.Re querier.Select(true, hints, filteredMatchers...), sortedExternalLabels, h.remoteReadMaxBytesInFrame, + h.marshalPool, ) if err != nil { return err diff --git a/storage/remote/read_handler_test.go b/storage/remote/read_handler_test.go index 03e4b706b..ba2517a40 100644 --- a/storage/remote/read_handler_test.go +++ b/storage/remote/read_handler_test.go @@ -107,6 +107,73 @@ func TestSampledReadEndpoint(t *testing.T) { }, resp.Results[0]) } +func BenchmarkStreamReadEndpoint(b *testing.B) { + suite, err := promql.NewTest(b, ` + load 1m + test_metric1{foo="bar1",baz="qux"} 0+100x119 + test_metric1{foo="bar2",baz="qux"} 0+100x120 + test_metric1{foo="bar3",baz="qux"} 0+100x240 +`) + require.NoError(b, err) + + defer suite.Close() + + require.NoError(b, suite.Run()) + + api := NewReadHandler(nil, nil, suite.Storage(), func() config.Config { + return config.Config{} + }, + 0, 1, 0, + ) + + matcher, err := labels.NewMatcher(labels.MatchEqual, "__name__", "test_metric1") + require.NoError(b, err) + + query, err := ToQuery(0, 14400001, []*labels.Matcher{matcher}, &storage.SelectHints{ + Step: 1, + Func: "sum", + Start: 0, + End: 14400001, + }) + require.NoError(b, err) + + req := &prompb.ReadRequest{ + Queries: []*prompb.Query{query}, + AcceptedResponseTypes: []prompb.ReadRequest_ResponseType{prompb.ReadRequest_STREAMED_XOR_CHUNKS}, + } + data, err := proto.Marshal(req) + require.NoError(b, err) + + b.ResetTimer() + b.ReportAllocs() + + for i := 0; i < b.N; i++ { + compressed := snappy.Encode(nil, data) + request, err := http.NewRequest("POST", "", bytes.NewBuffer(compressed)) + require.NoError(b, err) + + recorder := httptest.NewRecorder() + api.ServeHTTP(recorder, request) + + require.Equal(b, 2, recorder.Code/100) + + var results []*prompb.ChunkedReadResponse + stream := NewChunkedReader(recorder.Result().Body, DefaultChunkedReadLimit, nil) + + for { + res := &prompb.ChunkedReadResponse{} + err := stream.NextProto(res) + if err == io.EOF { + break + } + require.NoError(b, err) + results = append(results, res) + } + + require.Equal(b, 6, len(results), "Expected 6 results.") + } +} + func TestStreamReadEndpoint(t *testing.T) { // First with 120 samples. We expect 1 frame with 1 chunk. // Second with 121 samples, We expect 1 frame with 2 chunks.