diff --git a/prompb/remote.pb.go b/prompb/remote.pb.go index 5dcb254f2..56fd03e94 100644 --- a/prompb/remote.pb.go +++ b/prompb/remote.pb.go @@ -340,7 +340,7 @@ func (m *QueryResult) GetTimeseries() []*TimeSeries { // ChunkedReadResponse is a response when response_type equals STREAMED_XOR_CHUNKS. // We strictly stream full series after series, optionally split by time. This means that a single frame can contain // partition of the single series, but once a new series is started to be streamed it means that no more chunks will -// be sent for previous one. +// be sent for previous one. Series are returned sorted in the same way TSDB block are internally. type ChunkedReadResponse struct { ChunkedSeries []*ChunkedSeries `protobuf:"bytes,1,rep,name=chunked_series,json=chunkedSeries,proto3" json:"chunked_series,omitempty"` // query_index represents an index of the query from ReadRequest.queries these chunks relates to. diff --git a/prompb/remote.proto b/prompb/remote.proto index da2b06f29..ecd8f0bb1 100644 --- a/prompb/remote.proto +++ b/prompb/remote.proto @@ -73,7 +73,7 @@ message QueryResult { // ChunkedReadResponse is a response when response_type equals STREAMED_XOR_CHUNKS. // We strictly stream full series after series, optionally split by time. This means that a single frame can contain // partition of the single series, but once a new series is started to be streamed it means that no more chunks will -// be sent for previous one. +// be sent for previous one. Series are returned sorted in the same way TSDB block are internally. message ChunkedReadResponse { repeated prometheus.ChunkedSeries chunked_series = 1; diff --git a/promql/engine.go b/promql/engine.go index 2f442acbe..96d66f977 100644 --- a/promql/engine.go +++ b/promql/engine.go @@ -596,7 +596,6 @@ func (ng *Engine) execEvalStmt(ctx context.Context, query *query, s *EvalStmt) ( return nil, warnings, err } - // TODO(fabxc): order ensured by storage? // TODO(fabxc): where to ensure metric labels are a copy from the storage internals. sortSpanTimer, _ := query.stats.GetSpanTimer(ctx, stats.ResultSortTime, ng.metrics.queryResultSort) sort.Sort(mat) diff --git a/promql/engine_test.go b/promql/engine_test.go index 0c20b6a6f..230cee482 100644 --- a/promql/engine_test.go +++ b/promql/engine_test.go @@ -19,6 +19,7 @@ import ( "io/ioutil" "os" "strings" + "sort" "testing" "time" @@ -179,6 +180,9 @@ type errQuerier struct { func (q *errQuerier) Select(*storage.SelectParams, ...*labels.Matcher) (storage.SeriesSet, storage.Warnings, error) { return errSeriesSet{err: q.err}, nil, q.err } +func (q *errQuerier) SelectSorted(*storage.SelectParams, ...*labels.Matcher) (storage.SeriesSet, storage.Warnings, error) { + return errSeriesSet{err: q.err}, nil, q.err +} func (*errQuerier) LabelValues(name string) ([]string, storage.Warnings, error) { return nil, nil, nil } func (*errQuerier) LabelNames() ([]string, storage.Warnings, error) { return nil, nil, nil } func (*errQuerier) Close() error { return nil } @@ -236,7 +240,10 @@ type paramCheckerQuerier struct { t *testing.T } -func (q *paramCheckerQuerier) Select(sp *storage.SelectParams, _ ...*labels.Matcher) (storage.SeriesSet, storage.Warnings, error) { +func (q *paramCheckerQuerier) Select(sp *storage.SelectParams, m ...*labels.Matcher) (storage.SeriesSet, storage.Warnings, error) { + return q.SelectSorted(sp, m...) +} +func (q *paramCheckerQuerier) SelectSorted(sp *storage.SelectParams, _ ...*labels.Matcher) (storage.SeriesSet, storage.Warnings, error) { testutil.Equals(q.t, q.start, sp.Start) testutil.Equals(q.t, q.end, sp.End) testutil.Equals(q.t, q.grouping, sp.Grouping) @@ -1111,7 +1118,9 @@ func TestSubquerySelector(t *testing.T) { res := qry.Exec(test.Context()) testutil.Equals(t, c.Result.Err, res.Err) - testutil.Equals(t, c.Result.Value, res.Value) + mat := res.Value.(Matrix) + sort.Sort(mat) + testutil.Equals(t, c.Result.Value, mat) } } } diff --git a/storage/fanout.go b/storage/fanout.go index 41ccff5a5..b4746a74c 100644 --- a/storage/fanout.go +++ b/storage/fanout.go @@ -229,10 +229,19 @@ func NewMergeQuerier(primaryQuerier Querier, queriers []Querier) Querier { // Select returns a set of series that matches the given label matchers. func (q *mergeQuerier) Select(params *SelectParams, matchers ...*labels.Matcher) (SeriesSet, Warnings, error) { + if len(q.queriers) != 1 { + // We need to sort for NewMergeSeriesSet to work. + return q.SelectSorted(params, matchers...) + } + return q.queriers[0].Select(params, matchers...) +} + +// SelectSorted returns a set of sorted series that matches the given label matchers. +func (q *mergeQuerier) SelectSorted(params *SelectParams, matchers ...*labels.Matcher) (SeriesSet, Warnings, error) { seriesSets := make([]SeriesSet, 0, len(q.queriers)) var warnings Warnings for _, querier := range q.queriers { - set, wrn, err := querier.Select(params, matchers...) + set, wrn, err := querier.SelectSorted(params, matchers...) q.setQuerierMap[set] = querier if wrn != nil { warnings = append(warnings, wrn...) diff --git a/storage/interface.go b/storage/interface.go index 6c2343daa..0ff3da99a 100644 --- a/storage/interface.go +++ b/storage/interface.go @@ -54,6 +54,9 @@ type Querier interface { // Select returns a set of series that matches the given label matchers. Select(*SelectParams, ...*labels.Matcher) (SeriesSet, Warnings, error) + // SelectSorted returns a sorted set of series that matches the given label matchers. + SelectSorted(*SelectParams, ...*labels.Matcher) (SeriesSet, Warnings, error) + // LabelValues returns all potential values for a label name. LabelValues(name string) ([]string, Warnings, error) diff --git a/storage/noop.go b/storage/noop.go index acdd79f4b..797006dfb 100644 --- a/storage/noop.go +++ b/storage/noop.go @@ -30,6 +30,10 @@ func (noopQuerier) Select(*SelectParams, ...*labels.Matcher) (SeriesSet, Warning return NoopSeriesSet(), nil, nil } +func (noopQuerier) SelectSorted(*SelectParams, ...*labels.Matcher) (SeriesSet, Warnings, error) { + return NoopSeriesSet(), nil, nil +} + func (noopQuerier) LabelValues(name string) ([]string, Warnings, error) { return nil, nil, nil } diff --git a/storage/remote/codec.go b/storage/remote/codec.go index 6321b6f45..10500f06a 100644 --- a/storage/remote/codec.go +++ b/storage/remote/codec.go @@ -144,7 +144,7 @@ func ToQueryResult(ss storage.SeriesSet, sampleLimit int) (*prompb.QueryResult, return resp, nil } -// FromQueryResult unpacks a QueryResult proto. +// FromQueryResult unpacks and sorts a QueryResult proto. func FromQueryResult(res *prompb.QueryResult) storage.SeriesSet { series := make([]storage.Series, 0, len(res.Timeseries)) for _, ts := range res.Timeseries { diff --git a/storage/remote/read.go b/storage/remote/read.go index 2cffdabfb..3e5c9573c 100644 --- a/storage/remote/read.go +++ b/storage/remote/read.go @@ -60,6 +60,12 @@ type querier struct { // Select implements storage.Querier and uses the given matchers to read series // sets from the Client. func (q *querier) Select(p *storage.SelectParams, matchers ...*labels.Matcher) (storage.SeriesSet, storage.Warnings, error) { + return q.SelectSorted(p, matchers...) +} + +// SelectSorted implements storage.Querier and uses the given matchers to read series +// sets from the Client. +func (q *querier) SelectSorted(p *storage.SelectParams, matchers ...*labels.Matcher) (storage.SeriesSet, storage.Warnings, error) { query, err := ToQuery(q.mint, q.maxt, matchers, p) if err != nil { return nil, nil, err @@ -74,6 +80,7 @@ func (q *querier) Select(p *storage.SelectParams, matchers ...*labels.Matcher) ( return nil, nil, fmt.Errorf("remote_read: %v", err) } + // FromQueryResult sorts. return FromQueryResult(res), nil, nil } diff --git a/storage/tsdb/tsdb.go b/storage/tsdb/tsdb.go index d982ef5cf..1fa355761 100644 --- a/storage/tsdb/tsdb.go +++ b/storage/tsdb/tsdb.go @@ -250,6 +250,14 @@ func (q querier) Select(_ *storage.SelectParams, ms ...*labels.Matcher) (storage return seriesSet{set: set}, nil, nil } +func (q querier) SelectSorted(_ *storage.SelectParams, ms ...*labels.Matcher) (storage.SeriesSet, storage.Warnings, error) { + set, err := q.q.SelectSorted(ms...) + if err != nil { + return nil, nil, err + } + return seriesSet{set: set}, nil, nil +} + func (q querier) LabelValues(name string) ([]string, storage.Warnings, error) { v, err := q.q.LabelValues(name) return v, nil, err diff --git a/tsdb/head_test.go b/tsdb/head_test.go index 3ad49c646..70a0f56e8 100644 --- a/tsdb/head_test.go +++ b/tsdb/head_test.go @@ -790,7 +790,7 @@ func TestDelete_e2e(t *testing.T) { q, err := NewBlockQuerier(hb, 0, 100000) testutil.Ok(t, err) defer q.Close() - ss, err := q.Select(del.ms...) + ss, err := q.SelectSorted(del.ms...) testutil.Ok(t, err) // Build the mockSeriesSet. matchedSeries := make([]Series, 0, len(matched)) diff --git a/tsdb/querier.go b/tsdb/querier.go index bd2fa9d13..b7323abbf 100644 --- a/tsdb/querier.go +++ b/tsdb/querier.go @@ -34,6 +34,9 @@ type Querier interface { // Select returns a set of series that matches the given label matchers. Select(...*labels.Matcher) (SeriesSet, error) + // SelectSorted returns a sorted set of series that matches the given label matcher. + SelectSorted(...*labels.Matcher) (SeriesSet, error) + // LabelValues returns all potential values for a label name. // It is not safe to use the strings beyond the lifefime of the querier. LabelValues(string) ([]string, error) @@ -106,14 +109,21 @@ func (q *querier) lvals(qs []Querier, n string) ([]string, error) { } func (q *querier) Select(ms ...*labels.Matcher) (SeriesSet, error) { + if len(q.blocks) != 1 { + return q.SelectSorted(ms...) + } + // Sorting Head series is slow, and unneeded when only the + // Head is being queried. Sorting blocks is a noop. + return q.blocks[0].Select(ms...) +} + +func (q *querier) SelectSorted(ms ...*labels.Matcher) (SeriesSet, error) { if len(q.blocks) == 0 { return EmptySeriesSet(), nil } ss := make([]SeriesSet, len(q.blocks)) - var s SeriesSet - var err error for i, b := range q.blocks { - s, err = b.Select(ms...) + s, err := b.SelectSorted(ms...) if err != nil { return nil, err } @@ -142,12 +152,16 @@ func (q *verticalQuerier) Select(ms ...*labels.Matcher) (SeriesSet, error) { return q.sel(q.blocks, ms) } +func (q *verticalQuerier) SelectSorted(ms ...*labels.Matcher) (SeriesSet, error) { + return q.sel(q.blocks, ms) +} + func (q *verticalQuerier) sel(qs []Querier, ms []*labels.Matcher) (SeriesSet, error) { if len(qs) == 0 { return EmptySeriesSet(), nil } if len(qs) == 1 { - return qs[0].Select(ms...) + return qs[0].SelectSorted(ms...) } l := len(qs) / 2 @@ -217,6 +231,24 @@ func (q *blockQuerier) Select(ms ...*labels.Matcher) (SeriesSet, error) { }, nil } +func (q *blockQuerier) SelectSorted(ms ...*labels.Matcher) (SeriesSet, error) { + base, err := LookupChunkSeriesSorted(q.index, q.tombstones, ms...) + if err != nil { + return nil, err + } + return &blockSeriesSet{ + set: &populatedChunkSeries{ + set: base, + chunks: q.chunks, + mint: q.mint, + maxt: q.maxt, + }, + + mint: q.mint, + maxt: q.maxt, + }, nil +} + func (q *blockQuerier) LabelValues(name string) ([]string, error) { return q.index.LabelValues(name) } @@ -299,7 +331,7 @@ func findSetMatches(pattern string) []string { } // PostingsForMatchers assembles a single postings iterator against the index reader -// based on the given matchers. +// based on the given matchers. The resulting postings are not ordered by series. func PostingsForMatchers(ix IndexReader, ms ...*labels.Matcher) (index.Postings, error) { var its, notIts []index.Postings // See which label must be non-empty. @@ -379,7 +411,7 @@ func PostingsForMatchers(ix IndexReader, ms ...*labels.Matcher) (index.Postings, it = index.Without(it, n) } - return ix.SortedPostings(it), nil + return it, nil } func postingsForMatcher(ix IndexReader, m *labels.Matcher) (index.Postings, error) { @@ -689,6 +721,16 @@ type baseChunkSeries struct { // LookupChunkSeries retrieves all series for the given matchers and returns a ChunkSeriesSet // over them. It drops chunks based on tombstones in the given reader. func LookupChunkSeries(ir IndexReader, tr tombstones.Reader, ms ...*labels.Matcher) (ChunkSeriesSet, error) { + return lookupChunkSeries(false, ir, tr, ms...) +} + +// LookupChunkSeries retrieves all series for the given matchers and returns a ChunkSeriesSet +// over them. It drops chunks based on tombstones in the given reader. Series will be in order. +func LookupChunkSeriesSorted(ir IndexReader, tr tombstones.Reader, ms ...*labels.Matcher) (ChunkSeriesSet, error) { + return lookupChunkSeries(true, ir, tr, ms...) +} + +func lookupChunkSeries(sorted bool, ir IndexReader, tr tombstones.Reader, ms ...*labels.Matcher) (ChunkSeriesSet, error) { if tr == nil { tr = tombstones.NewMemTombstones() } @@ -696,6 +738,9 @@ func LookupChunkSeries(ir IndexReader, tr tombstones.Reader, ms ...*labels.Match if err != nil { return nil, err } + if sorted { + p = ir.SortedPostings(p) + } return &baseChunkSeries{ p: p, index: ir, diff --git a/web/api/v1/api.go b/web/api/v1/api.go index 9bfee44b5..9918fefc2 100644 --- a/web/api/v1/api.go +++ b/web/api/v1/api.go @@ -1121,7 +1121,12 @@ func (api *API) remoteRead(w http.ResponseWriter, r *http.Request) { return } for i, query := range req.Queries { - err := api.remoteReadQuery(ctx, query, externalLabels, func(set storage.SeriesSet) error { + err := api.remoteReadQuery(ctx, query, externalLabels, func(querier storage.Querier, selectParams *storage.SelectParams, filteredMatchers []*labels.Matcher) error { + // The streaming API provides sorted series. + set, _, err := querier.SelectSorted(selectParams, filteredMatchers...) + if err != nil { + return err + } return remote.StreamChunkedReadResponses( remote.NewChunkedWriter(w, f), @@ -1149,7 +1154,11 @@ func (api *API) remoteRead(w http.ResponseWriter, r *http.Request) { Results: make([]*prompb.QueryResult, len(req.Queries)), } for i, query := range req.Queries { - err := api.remoteReadQuery(ctx, query, externalLabels, func(set storage.SeriesSet) error { + err := api.remoteReadQuery(ctx, query, externalLabels, func(querier storage.Querier, selectParams *storage.SelectParams, filteredMatchers []*labels.Matcher) error { + set, _, err := querier.Select(selectParams, filteredMatchers...) + if err != nil { + return err + } resp.Results[i], err = remote.ToQueryResult(set, api.remoteReadSampleLimit) if err != nil { @@ -1204,7 +1213,7 @@ func filterExtLabelsFromMatchers(pbMatchers []*prompb.LabelMatcher, externalLabe return filteredMatchers, nil } -func (api *API) remoteReadQuery(ctx context.Context, query *prompb.Query, externalLabels map[string]string, seriesHandleFn func(set storage.SeriesSet) error) error { +func (api *API) remoteReadQuery(ctx context.Context, query *prompb.Query, externalLabels map[string]string, seriesHandleFn func(querier storage.Querier, selectParams *storage.SelectParams, filteredMatchers []*labels.Matcher) error) error { filteredMatchers, err := filterExtLabelsFromMatchers(query.Matchers, externalLabels) if err != nil { return err @@ -1231,11 +1240,7 @@ func (api *API) remoteReadQuery(ctx context.Context, query *prompb.Query, extern } }() - set, _, err := querier.Select(selectParams, filteredMatchers...) - if err != nil { - return err - } - return seriesHandleFn(set) + return seriesHandleFn(querier, selectParams, filteredMatchers) } func (api *API) deleteSeries(r *http.Request) apiFuncResult {