diff --git a/config/config.go b/config/config.go index d1e463d86..c9e8efbf3 100644 --- a/config/config.go +++ b/config/config.go @@ -221,6 +221,7 @@ var ( // DefaultRemoteReadConfig is the default remote read configuration. DefaultRemoteReadConfig = RemoteReadConfig{ RemoteTimeout: model.Duration(1 * time.Minute), + ChunkedReadLimit: DefaultChunkedReadLimit, HTTPClientConfig: config.DefaultHTTPClientConfig, FilterExternalLabels: true, } @@ -1279,13 +1280,20 @@ type MetadataConfig struct { MaxSamplesPerSend int `yaml:"max_samples_per_send,omitempty"` } +const ( + // DefaultChunkedReadLimit is the default value for the maximum size of the protobuf frame client allows. + // 50MB is the default. This is equivalent to ~100k full XOR chunks and average labelset. + DefaultChunkedReadLimit = 5e+7 +) + // RemoteReadConfig is the configuration for reading from remote storage. type RemoteReadConfig struct { - URL *config.URL `yaml:"url"` - RemoteTimeout model.Duration `yaml:"remote_timeout,omitempty"` - Headers map[string]string `yaml:"headers,omitempty"` - ReadRecent bool `yaml:"read_recent,omitempty"` - Name string `yaml:"name,omitempty"` + URL *config.URL `yaml:"url"` + RemoteTimeout model.Duration `yaml:"remote_timeout,omitempty"` + ChunkedReadLimit uint64 `yaml:"chunked_read_limit,omitempty"` + Headers map[string]string `yaml:"headers,omitempty"` + ReadRecent bool `yaml:"read_recent,omitempty"` + Name string `yaml:"name,omitempty"` // We cannot do proper Go type embedding below as the parser will then parse // values arbitrarily into the overflow maps of further-down types. diff --git a/config/config_test.go b/config/config_test.go index 774aba1e2..221906182 100644 --- a/config/config_test.go +++ b/config/config_test.go @@ -165,10 +165,11 @@ var expectedConf = &Config{ RemoteReadConfigs: []*RemoteReadConfig{ { - URL: mustParseURL("http://remote1/read"), - RemoteTimeout: model.Duration(1 * time.Minute), - ReadRecent: true, - Name: "default", + URL: mustParseURL("http://remote1/read"), + RemoteTimeout: model.Duration(1 * time.Minute), + ChunkedReadLimit: DefaultChunkedReadLimit, + ReadRecent: true, + Name: "default", HTTPClientConfig: config.HTTPClientConfig{ FollowRedirects: true, EnableHTTP2: false, @@ -178,6 +179,7 @@ var expectedConf = &Config{ { URL: mustParseURL("http://remote3/read"), RemoteTimeout: model.Duration(1 * time.Minute), + ChunkedReadLimit: DefaultChunkedReadLimit, ReadRecent: false, Name: "read_special", RequiredMatchers: model.LabelSet{"job": "special"}, diff --git a/storage/remote/chunked.go b/storage/remote/chunked.go index 96ce483e0..aa5addd6a 100644 --- a/storage/remote/chunked.go +++ b/storage/remote/chunked.go @@ -26,10 +26,6 @@ import ( "github.com/gogo/protobuf/proto" ) -// DefaultChunkedReadLimit is the default value for the maximum size of the protobuf frame client allows. -// 50MB is the default. This is equivalent to ~100k full XOR chunks and average labelset. -const DefaultChunkedReadLimit = 5e+7 - // The table gets initialized with sync.Once but may still cause a race // with any other use of the crc32 package anywhere. Thus we initialize it // before. diff --git a/storage/remote/client.go b/storage/remote/client.go index 2a66739ed..62218cfba 100644 --- a/storage/remote/client.go +++ b/storage/remote/client.go @@ -16,6 +16,7 @@ package remote import ( "bytes" "context" + "errors" "fmt" "io" "net/http" @@ -36,13 +37,14 @@ import ( "github.com/prometheus/prometheus/config" "github.com/prometheus/prometheus/prompb" + "github.com/prometheus/prometheus/storage" "github.com/prometheus/prometheus/storage/remote/azuread" "github.com/prometheus/prometheus/storage/remote/googleiam" ) -const maxErrMsgLen = 1024 - const ( + maxErrMsgLen = 1024 + RemoteWriteVersionHeader = "X-Prometheus-Remote-Write-Version" RemoteWriteVersion1HeaderValue = "0.1.0" RemoteWriteVersion20HeaderValue = "2.0.0" @@ -68,9 +70,12 @@ var ( config.RemoteWriteProtoMsgV1: appProtoContentType, // Also application/x-protobuf;proto=prometheus.WriteRequest but simplified for compatibility with 1.x spec. config.RemoteWriteProtoMsgV2: appProtoContentType + ";proto=io.prometheus.write.v2.Request", } -) -var ( + AcceptedResponseTypes = []prompb.ReadRequest_ResponseType{ + prompb.ReadRequest_STREAMED_XOR_CHUNKS, + prompb.ReadRequest_SAMPLES, + } + remoteReadQueriesTotal = prometheus.NewCounterVec( prometheus.CounterOpts{ Namespace: namespace, @@ -78,7 +83,7 @@ var ( Name: "read_queries_total", Help: "The total number of remote read queries.", }, - []string{remoteName, endpoint, "code"}, + []string{remoteName, endpoint, "response_type", "code"}, ) remoteReadQueries = prometheus.NewGaugeVec( prometheus.GaugeOpts{ @@ -94,13 +99,13 @@ var ( Namespace: namespace, Subsystem: subsystem, Name: "read_request_duration_seconds", - Help: "Histogram of the latency for remote read requests.", + Help: "Histogram of the latency for remote read requests. Note that for streamed responses this is only the duration of the initial call and does not include the processing of the stream.", Buckets: append(prometheus.DefBuckets, 25, 60), NativeHistogramBucketFactor: 1.1, NativeHistogramMaxBucketNumber: 100, NativeHistogramMinResetDuration: 1 * time.Hour, }, - []string{remoteName, endpoint}, + []string{remoteName, endpoint, "response_type"}, ) ) @@ -116,10 +121,11 @@ type Client struct { timeout time.Duration retryOnRateLimit bool + chunkedReadLimit uint64 readQueries prometheus.Gauge readQueriesTotal *prometheus.CounterVec - readQueriesDuration prometheus.Observer + readQueriesDuration prometheus.ObserverVec writeProtoMsg config.RemoteWriteProtoMsg writeCompression Compression // Not exposed by ClientConfig for now. @@ -136,12 +142,13 @@ type ClientConfig struct { Headers map[string]string RetryOnRateLimit bool WriteProtoMsg config.RemoteWriteProtoMsg + ChunkedReadLimit uint64 } -// ReadClient uses the SAMPLES method of remote read to read series samples from remote server. -// TODO(bwplotka): Add streamed chunked remote read method as well (https://github.com/prometheus/prometheus/issues/5926). +// ReadClient will request the STREAMED_XOR_CHUNKS method of remote read but can +// also fall back to the SAMPLES method if necessary. type ReadClient interface { - Read(ctx context.Context, query *prompb.Query) (*prompb.QueryResult, error) + Read(ctx context.Context, query *prompb.Query, sortSeries bool) (storage.SeriesSet, error) } // NewReadClient creates a new client for remote read. @@ -162,9 +169,10 @@ func NewReadClient(name string, conf *ClientConfig) (ReadClient, error) { urlString: conf.URL.String(), Client: httpClient, timeout: time.Duration(conf.Timeout), + chunkedReadLimit: conf.ChunkedReadLimit, readQueries: remoteReadQueries.WithLabelValues(name, conf.URL.String()), readQueriesTotal: remoteReadQueriesTotal.MustCurryWith(prometheus.Labels{remoteName: name, endpoint: conf.URL.String()}), - readQueriesDuration: remoteReadQueryDuration.WithLabelValues(name, conf.URL.String()), + readQueriesDuration: remoteReadQueryDuration.MustCurryWith(prometheus.Labels{remoteName: name, endpoint: conf.URL.String()}), }, nil } @@ -278,8 +286,8 @@ func (c *Client) Store(ctx context.Context, req []byte, attempt int) (WriteRespo return WriteResponseStats{}, RecoverableError{err, defaultBackoff} } defer func() { - io.Copy(io.Discard, httpResp.Body) - httpResp.Body.Close() + _, _ = io.Copy(io.Discard, httpResp.Body) + _ = httpResp.Body.Close() }() // TODO(bwplotka): Pass logger and emit debug on error? @@ -329,17 +337,17 @@ func (c *Client) Endpoint() string { return c.urlString } -// Read reads from a remote endpoint. -func (c *Client) Read(ctx context.Context, query *prompb.Query) (*prompb.QueryResult, error) { +// Read reads from a remote endpoint. The sortSeries parameter is only respected in the case of a sampled response; +// chunked responses arrive already sorted by the server. +func (c *Client) Read(ctx context.Context, query *prompb.Query, sortSeries bool) (storage.SeriesSet, error) { c.readQueries.Inc() defer c.readQueries.Dec() req := &prompb.ReadRequest{ // TODO: Support batching multiple queries into one read request, // as the protobuf interface allows for it. - Queries: []*prompb.Query{ - query, - }, + Queries: []*prompb.Query{query}, + AcceptedResponseTypes: AcceptedResponseTypes, } data, err := proto.Marshal(req) if err != nil { @@ -358,7 +366,6 @@ func (c *Client) Read(ctx context.Context, query *prompb.Query) (*prompb.QueryRe httpReq.Header.Set("X-Prometheus-Remote-Read-Version", "0.1.0") ctx, cancel := context.WithTimeout(ctx, c.timeout) - defer cancel() ctx, span := otel.Tracer("").Start(ctx, "Remote Read", trace.WithSpanKind(trace.SpanKindClient)) defer span.End() @@ -366,23 +373,57 @@ func (c *Client) Read(ctx context.Context, query *prompb.Query) (*prompb.QueryRe start := time.Now() httpResp, err := c.Client.Do(httpReq.WithContext(ctx)) if err != nil { + cancel() return nil, fmt.Errorf("error sending request: %w", err) } - defer func() { - io.Copy(io.Discard, httpResp.Body) - httpResp.Body.Close() - }() - c.readQueriesDuration.Observe(time.Since(start).Seconds()) - c.readQueriesTotal.WithLabelValues(strconv.Itoa(httpResp.StatusCode)).Inc() - compressed, err = io.ReadAll(httpResp.Body) - if err != nil { - return nil, fmt.Errorf("error reading response. HTTP status code: %s: %w", httpResp.Status, err) + if httpResp.StatusCode/100 != 2 { + // Make an attempt at getting an error message. + body, _ := io.ReadAll(httpResp.Body) + _ = httpResp.Body.Close() + + cancel() + return nil, fmt.Errorf("remote server %s returned http status %s: %s", c.urlString, httpResp.Status, string(body)) } - if httpResp.StatusCode/100 != 2 { - return nil, fmt.Errorf("remote server %s returned HTTP status %s: %s", c.urlString, httpResp.Status, strings.TrimSpace(string(compressed))) + contentType := httpResp.Header.Get("Content-Type") + + switch { + case strings.HasPrefix(contentType, "application/x-protobuf"): + c.readQueriesDuration.WithLabelValues("sampled").Observe(time.Since(start).Seconds()) + c.readQueriesTotal.WithLabelValues("sampled", strconv.Itoa(httpResp.StatusCode)).Inc() + ss, err := c.handleSampledResponse(req, httpResp, sortSeries) + cancel() + return ss, err + case strings.HasPrefix(contentType, "application/x-streamed-protobuf; proto=prometheus.ChunkedReadResponse"): + c.readQueriesDuration.WithLabelValues("chunked").Observe(time.Since(start).Seconds()) + + s := NewChunkedReader(httpResp.Body, c.chunkedReadLimit, nil) + return NewChunkedSeriesSet(s, httpResp.Body, query.StartTimestampMs, query.EndTimestampMs, func(err error) { + code := strconv.Itoa(httpResp.StatusCode) + if !errors.Is(err, io.EOF) { + code = "aborted_stream" + } + c.readQueriesTotal.WithLabelValues("chunked", code).Inc() + cancel() + }), nil + default: + c.readQueriesDuration.WithLabelValues("unsupported").Observe(time.Since(start).Seconds()) + c.readQueriesTotal.WithLabelValues("unsupported", strconv.Itoa(httpResp.StatusCode)).Inc() + cancel() + return nil, fmt.Errorf("unsupported content type: %s", contentType) + } +} + +func (c *Client) handleSampledResponse(req *prompb.ReadRequest, httpResp *http.Response, sortSeries bool) (storage.SeriesSet, error) { + compressed, err := io.ReadAll(httpResp.Body) + if err != nil { + return nil, fmt.Errorf("error reading response. HTTP status code: %s: %w", httpResp.Status, err) } + defer func() { + _, _ = io.Copy(io.Discard, httpResp.Body) + _ = httpResp.Body.Close() + }() uncompressed, err := snappy.Decode(nil, compressed) if err != nil { @@ -399,5 +440,8 @@ func (c *Client) Read(ctx context.Context, query *prompb.Query) (*prompb.QueryRe return nil, fmt.Errorf("responses: want %d, got %d", len(req.Queries), len(resp.Results)) } - return resp.Results[0], nil + // This client does not batch queries so there's always only 1 result. + res := resp.Results[0] + + return FromQueryResult(sortSeries, res), nil } diff --git a/storage/remote/client_test.go b/storage/remote/client_test.go index 9184ce100..c8b3d487e 100644 --- a/storage/remote/client_test.go +++ b/storage/remote/client_test.go @@ -23,9 +23,15 @@ import ( "testing" "time" + "github.com/gogo/protobuf/proto" + "github.com/golang/snappy" config_util "github.com/prometheus/common/config" "github.com/prometheus/common/model" "github.com/stretchr/testify/require" + + "github.com/prometheus/prometheus/config" + "github.com/prometheus/prometheus/prompb" + "github.com/prometheus/prometheus/tsdb/chunkenc" ) var longErrMessage = strings.Repeat("error message", maxErrMsgLen) @@ -208,3 +214,226 @@ func TestClientCustomHeaders(t *testing.T) { require.True(t, called, "The remote server wasn't called") } + +func TestReadClient(t *testing.T) { + tests := []struct { + name string + query *prompb.Query + httpHandler http.HandlerFunc + expectedLabels []map[string]string + expectedSamples [][]model.SamplePair + expectedErrorContains string + sortSeries bool + }{ + { + name: "sorted sampled response", + httpHandler: sampledResponseHTTPHandler(t), + expectedLabels: []map[string]string{ + {"foo1": "bar"}, + {"foo2": "bar"}, + }, + expectedSamples: [][]model.SamplePair{ + { + {Timestamp: model.Time(0), Value: model.SampleValue(3)}, + {Timestamp: model.Time(5), Value: model.SampleValue(4)}, + }, + { + {Timestamp: model.Time(0), Value: model.SampleValue(1)}, + {Timestamp: model.Time(5), Value: model.SampleValue(2)}, + }, + }, + expectedErrorContains: "", + sortSeries: true, + }, + { + name: "unsorted sampled response", + httpHandler: sampledResponseHTTPHandler(t), + expectedLabels: []map[string]string{ + {"foo2": "bar"}, + {"foo1": "bar"}, + }, + expectedSamples: [][]model.SamplePair{ + { + {Timestamp: model.Time(0), Value: model.SampleValue(1)}, + {Timestamp: model.Time(5), Value: model.SampleValue(2)}, + }, + { + {Timestamp: model.Time(0), Value: model.SampleValue(3)}, + {Timestamp: model.Time(5), Value: model.SampleValue(4)}, + }, + }, + expectedErrorContains: "", + sortSeries: false, + }, + { + name: "chunked response", + query: &prompb.Query{ + StartTimestampMs: 4000, + EndTimestampMs: 12000, + }, + httpHandler: http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + w.Header().Set("Content-Type", "application/x-streamed-protobuf; proto=prometheus.ChunkedReadResponse") + + flusher, ok := w.(http.Flusher) + require.True(t, ok) + + cw := NewChunkedWriter(w, flusher) + l := []prompb.Label{ + {Name: "foo", Value: "bar"}, + } + + chunks := buildTestChunks(t) + for i, c := range chunks { + cSeries := prompb.ChunkedSeries{Labels: l, Chunks: []prompb.Chunk{c}} + readResp := prompb.ChunkedReadResponse{ + ChunkedSeries: []*prompb.ChunkedSeries{&cSeries}, + QueryIndex: int64(i), + } + + b, err := proto.Marshal(&readResp) + require.NoError(t, err) + + _, err = cw.Write(b) + require.NoError(t, err) + } + }), + expectedLabels: []map[string]string{ + {"foo": "bar"}, + {"foo": "bar"}, + {"foo": "bar"}, + }, + // This is the output of buildTestChunks minus the samples outside the query range. + expectedSamples: [][]model.SamplePair{ + { + {Timestamp: model.Time(4000), Value: model.SampleValue(4)}, + }, + { + {Timestamp: model.Time(5000), Value: model.SampleValue(1)}, + {Timestamp: model.Time(6000), Value: model.SampleValue(2)}, + {Timestamp: model.Time(7000), Value: model.SampleValue(3)}, + {Timestamp: model.Time(8000), Value: model.SampleValue(4)}, + {Timestamp: model.Time(9000), Value: model.SampleValue(5)}, + }, + { + {Timestamp: model.Time(10000), Value: model.SampleValue(2)}, + {Timestamp: model.Time(11000), Value: model.SampleValue(3)}, + {Timestamp: model.Time(12000), Value: model.SampleValue(4)}, + }, + }, + expectedErrorContains: "", + }, + { + name: "unsupported content type", + httpHandler: http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + w.Header().Set("Content-Type", "foobar") + }), + expectedErrorContains: "unsupported content type", + }, + } + + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + server := httptest.NewServer(test.httpHandler) + defer server.Close() + + u, err := url.Parse(server.URL) + require.NoError(t, err) + + conf := &ClientConfig{ + URL: &config_util.URL{URL: u}, + Timeout: model.Duration(5 * time.Second), + ChunkedReadLimit: config.DefaultChunkedReadLimit, + } + c, err := NewReadClient("test", conf) + require.NoError(t, err) + + query := &prompb.Query{} + if test.query != nil { + query = test.query + } + + ss, err := c.Read(context.Background(), query, test.sortSeries) + if test.expectedErrorContains != "" { + require.ErrorContains(t, err, test.expectedErrorContains) + return + } + + require.NoError(t, err) + + i := 0 + + for ss.Next() { + require.NoError(t, ss.Err()) + s := ss.At() + + l := s.Labels() + require.Len(t, test.expectedLabels[i], l.Len()) + for k, v := range test.expectedLabels[i] { + require.True(t, l.Has(k)) + require.Equal(t, v, l.Get(k)) + } + + it := s.Iterator(nil) + j := 0 + + for valType := it.Next(); valType != chunkenc.ValNone; valType = it.Next() { + require.NoError(t, it.Err()) + + ts, v := it.At() + expectedSample := test.expectedSamples[i][j] + + require.Equal(t, int64(expectedSample.Timestamp), ts) + require.Equal(t, float64(expectedSample.Value), v) + + j++ + } + + require.Len(t, test.expectedSamples[i], j) + + i++ + } + + require.NoError(t, ss.Err()) + }) + } +} + +func sampledResponseHTTPHandler(t *testing.T) http.HandlerFunc { + return func(w http.ResponseWriter, r *http.Request) { + w.Header().Set("Content-Type", "application/x-protobuf") + + resp := prompb.ReadResponse{ + Results: []*prompb.QueryResult{ + { + Timeseries: []*prompb.TimeSeries{ + { + Labels: []prompb.Label{ + {Name: "foo2", Value: "bar"}, + }, + Samples: []prompb.Sample{ + {Value: float64(1), Timestamp: int64(0)}, + {Value: float64(2), Timestamp: int64(5)}, + }, + Exemplars: []prompb.Exemplar{}, + }, + { + Labels: []prompb.Label{ + {Name: "foo1", Value: "bar"}, + }, + Samples: []prompb.Sample{ + {Value: float64(3), Timestamp: int64(0)}, + {Value: float64(4), Timestamp: int64(5)}, + }, + Exemplars: []prompb.Exemplar{}, + }, + }, + }, + }, + } + b, err := proto.Marshal(&resp) + require.NoError(t, err) + + _, err = w.Write(snappy.Encode(nil, b)) + require.NoError(t, err) + } +} diff --git a/storage/remote/codec.go b/storage/remote/codec.go index c9220ca42..80bb81150 100644 --- a/storage/remote/codec.go +++ b/storage/remote/codec.go @@ -540,6 +540,220 @@ func (c *concreteSeriesIterator) Err() error { return nil } +// chunkedSeriesSet implements storage.SeriesSet. +type chunkedSeriesSet struct { + chunkedReader *ChunkedReader + respBody io.ReadCloser + mint, maxt int64 + cancel func(error) + + current storage.Series + err error +} + +func NewChunkedSeriesSet(chunkedReader *ChunkedReader, respBody io.ReadCloser, mint, maxt int64, cancel func(error)) storage.SeriesSet { + return &chunkedSeriesSet{ + chunkedReader: chunkedReader, + respBody: respBody, + mint: mint, + maxt: maxt, + cancel: cancel, + } +} + +// Next return true if there is a next series and false otherwise. It will +// block until the next series is available. +func (s *chunkedSeriesSet) Next() bool { + res := &prompb.ChunkedReadResponse{} + + err := s.chunkedReader.NextProto(res) + if err != nil { + if !errors.Is(err, io.EOF) { + s.err = err + _, _ = io.Copy(io.Discard, s.respBody) + } + + _ = s.respBody.Close() + s.cancel(err) + + return false + } + + s.current = &chunkedSeries{ + ChunkedSeries: prompb.ChunkedSeries{ + Labels: res.ChunkedSeries[0].Labels, + Chunks: res.ChunkedSeries[0].Chunks, + }, + mint: s.mint, + maxt: s.maxt, + } + + return true +} + +func (s *chunkedSeriesSet) At() storage.Series { + return s.current +} + +func (s *chunkedSeriesSet) Err() error { + return s.err +} + +func (s *chunkedSeriesSet) Warnings() annotations.Annotations { + return nil +} + +type chunkedSeries struct { + prompb.ChunkedSeries + mint, maxt int64 +} + +var _ storage.Series = &chunkedSeries{} + +func (s *chunkedSeries) Labels() labels.Labels { + b := labels.NewScratchBuilder(0) + return s.ToLabels(&b, nil) +} + +func (s *chunkedSeries) Iterator(it chunkenc.Iterator) chunkenc.Iterator { + csIt, ok := it.(*chunkedSeriesIterator) + if ok { + csIt.reset(s.Chunks, s.mint, s.maxt) + return csIt + } + return newChunkedSeriesIterator(s.Chunks, s.mint, s.maxt) +} + +type chunkedSeriesIterator struct { + chunks []prompb.Chunk + idx int + cur chunkenc.Iterator + valType chunkenc.ValueType + mint, maxt int64 + + err error +} + +var _ chunkenc.Iterator = &chunkedSeriesIterator{} + +func newChunkedSeriesIterator(chunks []prompb.Chunk, mint, maxt int64) *chunkedSeriesIterator { + it := &chunkedSeriesIterator{} + it.reset(chunks, mint, maxt) + return it +} + +func (it *chunkedSeriesIterator) Next() chunkenc.ValueType { + if it.err != nil { + return chunkenc.ValNone + } + if len(it.chunks) == 0 { + return chunkenc.ValNone + } + + for it.valType = it.cur.Next(); it.valType != chunkenc.ValNone; it.valType = it.cur.Next() { + atT := it.AtT() + if atT > it.maxt { + it.chunks = nil // Exhaust this iterator so follow-up calls to Next or Seek return fast. + return chunkenc.ValNone + } + if atT >= it.mint { + return it.valType + } + } + + if it.idx >= len(it.chunks)-1 { + it.valType = chunkenc.ValNone + } else { + it.idx++ + it.resetIterator() + it.valType = it.Next() + } + + return it.valType +} + +func (it *chunkedSeriesIterator) Seek(t int64) chunkenc.ValueType { + if it.err != nil { + return chunkenc.ValNone + } + if len(it.chunks) == 0 { + return chunkenc.ValNone + } + + startIdx := it.idx + it.idx += sort.Search(len(it.chunks)-startIdx, func(i int) bool { + return it.chunks[startIdx+i].MaxTimeMs >= t + }) + if it.idx > startIdx { + it.resetIterator() + } else { + ts := it.cur.AtT() + if ts >= t { + return it.valType + } + } + + for it.valType = it.cur.Next(); it.valType != chunkenc.ValNone; it.valType = it.cur.Next() { + ts := it.cur.AtT() + if ts > it.maxt { + it.chunks = nil // Exhaust this iterator so follow-up calls to Next or Seek return fast. + return chunkenc.ValNone + } + if ts >= t && ts >= it.mint { + return it.valType + } + } + + it.valType = chunkenc.ValNone + return it.valType +} + +func (it *chunkedSeriesIterator) resetIterator() { + if it.idx < len(it.chunks) { + chunk := it.chunks[it.idx] + + decodedChunk, err := chunkenc.FromData(chunkenc.Encoding(chunk.Type), chunk.Data) + if err != nil { + it.err = err + return + } + + it.cur = decodedChunk.Iterator(nil) + } else { + it.cur = chunkenc.NewNopIterator() + } +} + +func (it *chunkedSeriesIterator) reset(chunks []prompb.Chunk, mint, maxt int64) { + it.chunks = chunks + it.mint = mint + it.maxt = maxt + it.idx = 0 + if len(chunks) > 0 { + it.resetIterator() + } +} + +func (it *chunkedSeriesIterator) At() (ts int64, v float64) { + return it.cur.At() +} + +func (it *chunkedSeriesIterator) AtHistogram(h *histogram.Histogram) (int64, *histogram.Histogram) { + return it.cur.AtHistogram(h) +} + +func (it *chunkedSeriesIterator) AtFloatHistogram(fh *histogram.FloatHistogram) (int64, *histogram.FloatHistogram) { + return it.cur.AtFloatHistogram(fh) +} + +func (it *chunkedSeriesIterator) AtT() int64 { + return it.cur.AtT() +} + +func (it *chunkedSeriesIterator) Err() error { + return it.err +} + // validateLabelsAndMetricName validates the label names/values and metric names returned from remote read, // also making sure that there are no labels with duplicate names. func validateLabelsAndMetricName(ls []prompb.Label) error { @@ -612,15 +826,6 @@ func FromLabelMatchers(matchers []*prompb.LabelMatcher) ([]*labels.Matcher, erro return result, nil } -// LabelProtosToMetric unpack a []*prompb.Label to a model.Metric. -func LabelProtosToMetric(labelPairs []*prompb.Label) model.Metric { - metric := make(model.Metric, len(labelPairs)) - for _, l := range labelPairs { - metric[model.LabelName(l.Name)] = model.LabelValue(l.Value) - } - return metric -} - // DecodeWriteRequest from an io.Reader into a prompb.WriteRequest, handling // snappy decompression. // Used also by documentation/examples/remote_storage. diff --git a/storage/remote/codec_test.go b/storage/remote/codec_test.go index 279d10e41..404f1add7 100644 --- a/storage/remote/codec_test.go +++ b/storage/remote/codec_test.go @@ -16,6 +16,7 @@ package remote import ( "bytes" "fmt" + "io" "sync" "testing" @@ -24,6 +25,7 @@ import ( "github.com/prometheus/common/model" "github.com/stretchr/testify/require" + "github.com/prometheus/prometheus/config" "github.com/prometheus/prometheus/model/histogram" "github.com/prometheus/prometheus/model/labels" "github.com/prometheus/prometheus/model/metadata" @@ -705,3 +707,270 @@ func (c *mockChunkIterator) Next() bool { func (c *mockChunkIterator) Err() error { return nil } + +func TestChunkedSeriesIterator(t *testing.T) { + t.Run("happy path", func(t *testing.T) { + chks := buildTestChunks(t) + + it := newChunkedSeriesIterator(chks, 2000, 12000) + + require.NoError(t, it.err) + require.NotNil(t, it.cur) + + // Initial next; advance to first valid sample of first chunk. + res := it.Next() + require.Equal(t, chunkenc.ValFloat, res) + require.NoError(t, it.Err()) + + ts, v := it.At() + require.Equal(t, int64(2000), ts) + require.Equal(t, float64(2), v) + + // Next to the second sample of the first chunk. + res = it.Next() + require.Equal(t, chunkenc.ValFloat, res) + require.NoError(t, it.Err()) + + ts, v = it.At() + require.Equal(t, int64(3000), ts) + require.Equal(t, float64(3), v) + + // Attempt to seek to the first sample of the first chunk (should return current sample). + res = it.Seek(0) + require.Equal(t, chunkenc.ValFloat, res) + + ts, v = it.At() + require.Equal(t, int64(3000), ts) + require.Equal(t, float64(3), v) + + // Seek to the end of the first chunk. + res = it.Seek(4000) + require.Equal(t, chunkenc.ValFloat, res) + + ts, v = it.At() + require.Equal(t, int64(4000), ts) + require.Equal(t, float64(4), v) + + // Next to the first sample of the second chunk. + res = it.Next() + require.Equal(t, chunkenc.ValFloat, res) + require.NoError(t, it.Err()) + + ts, v = it.At() + require.Equal(t, int64(5000), ts) + require.Equal(t, float64(1), v) + + // Seek to the second sample of the third chunk. + res = it.Seek(10999) + require.Equal(t, chunkenc.ValFloat, res) + require.NoError(t, it.Err()) + + ts, v = it.At() + require.Equal(t, int64(11000), ts) + require.Equal(t, float64(3), v) + + // Attempt to seek to something past the last sample (should return false and exhaust the iterator). + res = it.Seek(99999) + require.Equal(t, chunkenc.ValNone, res) + require.NoError(t, it.Err()) + + // Attempt to next past the last sample (should return false as the iterator is exhausted). + res = it.Next() + require.Equal(t, chunkenc.ValNone, res) + require.NoError(t, it.Err()) + }) + + t.Run("invalid chunk encoding error", func(t *testing.T) { + chks := buildTestChunks(t) + + // Set chunk type to an invalid value. + chks[0].Type = 8 + + it := newChunkedSeriesIterator(chks, 0, 14000) + + res := it.Next() + require.Equal(t, chunkenc.ValNone, res) + + res = it.Seek(1000) + require.Equal(t, chunkenc.ValNone, res) + + require.ErrorContains(t, it.err, "invalid chunk encoding") + require.Nil(t, it.cur) + }) + + t.Run("empty chunks", func(t *testing.T) { + emptyChunks := make([]prompb.Chunk, 0) + + it1 := newChunkedSeriesIterator(emptyChunks, 0, 1000) + require.Equal(t, chunkenc.ValNone, it1.Next()) + require.Equal(t, chunkenc.ValNone, it1.Seek(1000)) + require.NoError(t, it1.Err()) + + var nilChunks []prompb.Chunk + + it2 := newChunkedSeriesIterator(nilChunks, 0, 1000) + require.Equal(t, chunkenc.ValNone, it2.Next()) + require.Equal(t, chunkenc.ValNone, it2.Seek(1000)) + require.NoError(t, it2.Err()) + }) +} + +func TestChunkedSeries(t *testing.T) { + t.Run("happy path", func(t *testing.T) { + chks := buildTestChunks(t) + + s := chunkedSeries{ + ChunkedSeries: prompb.ChunkedSeries{ + Labels: []prompb.Label{ + {Name: "foo", Value: "bar"}, + {Name: "asdf", Value: "zxcv"}, + }, + Chunks: chks, + }, + } + + require.Equal(t, labels.FromStrings("asdf", "zxcv", "foo", "bar"), s.Labels()) + + it := s.Iterator(nil) + res := it.Next() // Behavior is undefined w/o the initial call to Next. + + require.Equal(t, chunkenc.ValFloat, res) + require.NoError(t, it.Err()) + + ts, v := it.At() + require.Equal(t, int64(0), ts) + require.Equal(t, float64(0), v) + }) +} + +func TestChunkedSeriesSet(t *testing.T) { + t.Run("happy path", func(t *testing.T) { + buf := &bytes.Buffer{} + flusher := &mockFlusher{} + + w := NewChunkedWriter(buf, flusher) + r := NewChunkedReader(buf, config.DefaultChunkedReadLimit, nil) + + chks := buildTestChunks(t) + l := []prompb.Label{ + {Name: "foo", Value: "bar"}, + } + + for i, c := range chks { + cSeries := prompb.ChunkedSeries{Labels: l, Chunks: []prompb.Chunk{c}} + readResp := prompb.ChunkedReadResponse{ + ChunkedSeries: []*prompb.ChunkedSeries{&cSeries}, + QueryIndex: int64(i), + } + + b, err := proto.Marshal(&readResp) + require.NoError(t, err) + + _, err = w.Write(b) + require.NoError(t, err) + } + + ss := NewChunkedSeriesSet(r, io.NopCloser(buf), 0, 14000, func(error) {}) + require.NoError(t, ss.Err()) + require.Nil(t, ss.Warnings()) + + res := ss.Next() + require.True(t, res) + require.NoError(t, ss.Err()) + + s := ss.At() + require.Equal(t, 1, s.Labels().Len()) + require.True(t, s.Labels().Has("foo")) + require.Equal(t, "bar", s.Labels().Get("foo")) + + it := s.Iterator(nil) + it.Next() + ts, v := it.At() + require.Equal(t, int64(0), ts) + require.Equal(t, float64(0), v) + + numResponses := 1 + for ss.Next() { + numResponses++ + } + require.Equal(t, numTestChunks, numResponses) + require.NoError(t, ss.Err()) + }) + + t.Run("chunked reader error", func(t *testing.T) { + buf := &bytes.Buffer{} + flusher := &mockFlusher{} + + w := NewChunkedWriter(buf, flusher) + r := NewChunkedReader(buf, config.DefaultChunkedReadLimit, nil) + + chks := buildTestChunks(t) + l := []prompb.Label{ + {Name: "foo", Value: "bar"}, + } + + for i, c := range chks { + cSeries := prompb.ChunkedSeries{Labels: l, Chunks: []prompb.Chunk{c}} + readResp := prompb.ChunkedReadResponse{ + ChunkedSeries: []*prompb.ChunkedSeries{&cSeries}, + QueryIndex: int64(i), + } + + b, err := proto.Marshal(&readResp) + require.NoError(t, err) + + b[0] = 0xFF // Corruption! + + _, err = w.Write(b) + require.NoError(t, err) + } + + ss := NewChunkedSeriesSet(r, io.NopCloser(buf), 0, 14000, func(error) {}) + require.NoError(t, ss.Err()) + require.Nil(t, ss.Warnings()) + + res := ss.Next() + require.False(t, res) + require.ErrorContains(t, ss.Err(), "proto: illegal wireType 7") + }) +} + +// mockFlusher implements http.Flusher. +type mockFlusher struct{} + +func (f *mockFlusher) Flush() {} + +const ( + numTestChunks = 3 + numSamplesPerTestChunk = 5 +) + +func buildTestChunks(t *testing.T) []prompb.Chunk { + startTime := int64(0) + chks := make([]prompb.Chunk, 0, numTestChunks) + + time := startTime + + for i := 0; i < numTestChunks; i++ { + c := chunkenc.NewXORChunk() + + a, err := c.Appender() + require.NoError(t, err) + + minTimeMs := time + + for j := 0; j < numSamplesPerTestChunk; j++ { + a.Append(time, float64(i+j)) + time += int64(1000) + } + + chks = append(chks, prompb.Chunk{ + MinTimeMs: minTimeMs, + MaxTimeMs: time, + Type: prompb.Chunk_XOR, + Data: c.Bytes(), + }) + } + + return chks +} diff --git a/storage/remote/read.go b/storage/remote/read.go index e54b14f1e..2ec48784d 100644 --- a/storage/remote/read.go +++ b/storage/remote/read.go @@ -165,11 +165,11 @@ func (q *querier) Select(ctx context.Context, sortSeries bool, hints *storage.Se return storage.ErrSeriesSet(fmt.Errorf("toQuery: %w", err)) } - res, err := q.client.Read(ctx, query) + res, err := q.client.Read(ctx, query, sortSeries) if err != nil { return storage.ErrSeriesSet(fmt.Errorf("remote_read: %w", err)) } - return newSeriesSetFilter(FromQueryResult(sortSeries, res), added) + return newSeriesSetFilter(res, added) } // addExternalLabels adds matchers for each external label. External labels diff --git a/storage/remote/read_handler_test.go b/storage/remote/read_handler_test.go index a68187268..4cd4647e7 100644 --- a/storage/remote/read_handler_test.go +++ b/storage/remote/read_handler_test.go @@ -179,7 +179,7 @@ func BenchmarkStreamReadEndpoint(b *testing.B) { require.Equal(b, 2, recorder.Code/100) var results []*prompb.ChunkedReadResponse - stream := NewChunkedReader(recorder.Result().Body, DefaultChunkedReadLimit, nil) + stream := NewChunkedReader(recorder.Result().Body, config.DefaultChunkedReadLimit, nil) for { res := &prompb.ChunkedReadResponse{} @@ -280,7 +280,7 @@ func TestStreamReadEndpoint(t *testing.T) { require.Equal(t, "", recorder.Result().Header.Get("Content-Encoding")) var results []*prompb.ChunkedReadResponse - stream := NewChunkedReader(recorder.Result().Body, DefaultChunkedReadLimit, nil) + stream := NewChunkedReader(recorder.Result().Body, config.DefaultChunkedReadLimit, nil) for { res := &prompb.ChunkedReadResponse{} err := stream.NextProto(res) diff --git a/storage/remote/read_test.go b/storage/remote/read_test.go index 357bdba1f..d63cefc3f 100644 --- a/storage/remote/read_test.go +++ b/storage/remote/read_test.go @@ -27,6 +27,7 @@ import ( "github.com/prometheus/prometheus/config" "github.com/prometheus/prometheus/model/labels" "github.com/prometheus/prometheus/prompb" + "github.com/prometheus/prometheus/storage" "github.com/prometheus/prometheus/util/annotations" "github.com/prometheus/prometheus/util/testutil" ) @@ -198,7 +199,7 @@ type mockedRemoteClient struct { b labels.ScratchBuilder } -func (c *mockedRemoteClient) Read(_ context.Context, query *prompb.Query) (*prompb.QueryResult, error) { +func (c *mockedRemoteClient) Read(_ context.Context, query *prompb.Query, sortSeries bool) (storage.SeriesSet, error) { if c.got != nil { return nil, fmt.Errorf("expected only one call to remote client got: %v", query) } @@ -227,7 +228,7 @@ func (c *mockedRemoteClient) Read(_ context.Context, query *prompb.Query) (*prom q.Timeseries = append(q.Timeseries, &prompb.TimeSeries{Labels: s.Labels}) } } - return q, nil + return FromQueryResult(sortSeries, q), nil } func (c *mockedRemoteClient) reset() { diff --git a/storage/remote/storage.go b/storage/remote/storage.go index afa2d411a..05634f179 100644 --- a/storage/remote/storage.go +++ b/storage/remote/storage.go @@ -115,6 +115,7 @@ func (s *Storage) ApplyConfig(conf *config.Config) error { c, err := NewReadClient(name, &ClientConfig{ URL: rrConf.URL, Timeout: rrConf.RemoteTimeout, + ChunkedReadLimit: rrConf.ChunkedReadLimit, HTTPClientConfig: rrConf.HTTPClientConfig, Headers: rrConf.Headers, }) diff --git a/web/api/v1/api_test.go b/web/api/v1/api_test.go index ba38ddc97..261ed6b61 100644 --- a/web/api/v1/api_test.go +++ b/web/api/v1/api_test.go @@ -1074,6 +1074,9 @@ func setupRemote(s storage.Storage) *httptest.Server { } } + w.Header().Set("Content-Type", "application/x-protobuf") + w.Header().Set("Content-Encoding", "snappy") + if err := remote.EncodeReadResponse(&resp, w); err != nil { http.Error(w, err.Error(), http.StatusInternalServerError) return