From 38d32e06862f6b72700f67043ce574508b5697f0 Mon Sep 17 00:00:00 2001 From: Brian Brazil Date: Fri, 17 Jan 2020 11:21:44 +0000 Subject: [PATCH] Don't sort postings if we only have one block. Sorting the heads postings can be quite slow. We only need sorted series when merging with another querier, so only sort then. This will make big queries that only touch the head faster, though queries that touch both the head and a block will still be the same speed. This probably won't help much with graphing unless the range is under an hour, however it should make most recording rules faster. Add gaurantee that remote read streaming produces sorted series. PromQL benchmarks for histograms show only 2-3% improvement, but they're only over 1k series. benchmark old ns/op new ns/op delta BenchmarkQuerierSelect/Head/1of1000000-4 1375486282 507657736 -63.09% BenchmarkQuerierSelect/Head/10of1000000-4 1387859004 507769850 -63.41% BenchmarkQuerierSelect/Head/100of1000000-4 1387087935 506029110 -63.52% BenchmarkQuerierSelect/Head/1000of1000000-4 1386869064 504521986 -63.62% BenchmarkQuerierSelect/Head/10000of1000000-4 1386213685 505210422 -63.55% BenchmarkQuerierSelect/Head/100000of1000000-4 1392754988 529842406 -61.96% BenchmarkQuerierSelect/Head/1000000of1000000-4 1569414722 725059506 -53.80% BenchmarkQuerierSelect/SortedHead/1of1000000-4 1381019902 1370495863 -0.76% BenchmarkQuerierSelect/SortedHead/10of1000000-4 1375696209 1366789468 -0.65% BenchmarkQuerierSelect/SortedHead/100of1000000-4 1386009422 1364519297 -1.55% BenchmarkQuerierSelect/SortedHead/1000of1000000-4 1377700532 1364486191 -0.96% BenchmarkQuerierSelect/SortedHead/10000of1000000-4 1383539536 1369545314 -1.01% BenchmarkQuerierSelect/SortedHead/100000of1000000-4 1410089163 1394731339 -1.09% BenchmarkQuerierSelect/SortedHead/1000000of1000000-4 1634744148 1581554956 -3.25% BenchmarkQuerierSelect/Block/1of1000000-4 881741242 879839470 -0.22% BenchmarkQuerierSelect/Block/10of1000000-4 880381562 882846038 +0.28% BenchmarkQuerierSelect/Block/100of1000000-4 887519357 881016916 -0.73% BenchmarkQuerierSelect/Block/1000of1000000-4 902194205 883433524 -2.08% BenchmarkQuerierSelect/Block/10000of1000000-4 892321964 885130170 -0.81% BenchmarkQuerierSelect/Block/100000of1000000-4 938604466 933527150 -0.54% BenchmarkQuerierSelect/Block/1000000of1000000-4 1313510845 1295881124 -1.34% benchmark old allocs new allocs delta BenchmarkQuerierSelect/Head/1of1000000-4 4000056 4000018 -0.00% BenchmarkQuerierSelect/Head/10of1000000-4 4000074 4000036 -0.00% BenchmarkQuerierSelect/Head/100of1000000-4 4000254 4000216 -0.00% BenchmarkQuerierSelect/Head/1000of1000000-4 4002054 4002016 -0.00% BenchmarkQuerierSelect/Head/10000of1000000-4 4020054 4020016 -0.00% BenchmarkQuerierSelect/Head/100000of1000000-4 4200054 4200016 -0.00% BenchmarkQuerierSelect/Head/1000000of1000000-4 6000054 6000016 -0.00% BenchmarkQuerierSelect/SortedHead/1of1000000-4 4000071 4000071 +0.00% BenchmarkQuerierSelect/SortedHead/10of1000000-4 4000089 4000089 +0.00% BenchmarkQuerierSelect/SortedHead/100of1000000-4 4000269 4000269 +0.00% BenchmarkQuerierSelect/SortedHead/1000of1000000-4 4002069 4002069 +0.00% BenchmarkQuerierSelect/SortedHead/10000of1000000-4 4020069 4020069 +0.00% BenchmarkQuerierSelect/SortedHead/100000of1000000-4 4200069 4200069 +0.00% BenchmarkQuerierSelect/SortedHead/1000000of1000000-4 6000069 6000069 +0.00% BenchmarkQuerierSelect/Block/1of1000000-4 6000023 6000022 -0.00% BenchmarkQuerierSelect/Block/10of1000000-4 6000059 6000058 -0.00% BenchmarkQuerierSelect/Block/100of1000000-4 6000419 6000418 -0.00% BenchmarkQuerierSelect/Block/1000of1000000-4 6004019 6004018 -0.00% BenchmarkQuerierSelect/Block/10000of1000000-4 6040019 6040018 -0.00% BenchmarkQuerierSelect/Block/100000of1000000-4 6400019 6400018 -0.00% BenchmarkQuerierSelect/Block/1000000of1000000-4 10000020 10000019 -0.00% benchmark old bytes new bytes delta BenchmarkQuerierSelect/Head/1of1000000-4 229192200 176001176 -23.21% BenchmarkQuerierSelect/Head/10of1000000-4 229193352 176002328 -23.21% BenchmarkQuerierSelect/Head/100of1000000-4 229204872 176013848 -23.21% BenchmarkQuerierSelect/Head/1000of1000000-4 229320072 176129048 -23.20% BenchmarkQuerierSelect/Head/10000of1000000-4 230472072 177281048 -23.08% BenchmarkQuerierSelect/Head/100000of1000000-4 241992072 188801048 -21.98% BenchmarkQuerierSelect/Head/1000000of1000000-4 357192072 304001048 -14.89% BenchmarkQuerierSelect/SortedHead/1of1000000-4 229193928 229193928 +0.00% BenchmarkQuerierSelect/SortedHead/10of1000000-4 229195080 229195080 +0.00% BenchmarkQuerierSelect/SortedHead/100of1000000-4 229206600 229206600 +0.00% BenchmarkQuerierSelect/SortedHead/1000of1000000-4 229321800 229321800 +0.00% BenchmarkQuerierSelect/SortedHead/10000of1000000-4 230473800 230473800 +0.00% BenchmarkQuerierSelect/SortedHead/100000of1000000-4 241993800 241993800 +0.00% BenchmarkQuerierSelect/SortedHead/1000000of1000000-4 357193800 357193800 +0.00% BenchmarkQuerierSelect/Block/1of1000000-4 227201516 227201500 -0.00% BenchmarkQuerierSelect/Block/10of1000000-4 227202924 227202908 -0.00% BenchmarkQuerierSelect/Block/100of1000000-4 227217036 227217020 -0.00% BenchmarkQuerierSelect/Block/1000of1000000-4 227358156 227358140 -0.00% BenchmarkQuerierSelect/Block/10000of1000000-4 228769356 228769340 -0.00% BenchmarkQuerierSelect/Block/100000of1000000-4 242881356 242881340 -0.00% BenchmarkQuerierSelect/Block/1000000of1000000-4 384001616 384001600 -0.00% Signed-off-by: Brian Brazil --- prompb/remote.pb.go | 2 +- prompb/remote.proto | 2 +- promql/engine.go | 1 - promql/engine_test.go | 13 ++++++++-- storage/fanout.go | 11 +++++++- storage/interface.go | 3 +++ storage/noop.go | 4 +++ storage/remote/codec.go | 2 +- storage/remote/read.go | 7 +++++ storage/tsdb/tsdb.go | 8 ++++++ tsdb/head_test.go | 2 +- tsdb/querier.go | 57 ++++++++++++++++++++++++++++++++++++----- web/api/v1/api.go | 21 +++++++++------ 13 files changed, 111 insertions(+), 22 deletions(-) diff --git a/prompb/remote.pb.go b/prompb/remote.pb.go index 5dcb254f2..56fd03e94 100644 --- a/prompb/remote.pb.go +++ b/prompb/remote.pb.go @@ -340,7 +340,7 @@ func (m *QueryResult) GetTimeseries() []*TimeSeries { // ChunkedReadResponse is a response when response_type equals STREAMED_XOR_CHUNKS. // We strictly stream full series after series, optionally split by time. This means that a single frame can contain // partition of the single series, but once a new series is started to be streamed it means that no more chunks will -// be sent for previous one. +// be sent for previous one. Series are returned sorted in the same way TSDB block are internally. type ChunkedReadResponse struct { ChunkedSeries []*ChunkedSeries `protobuf:"bytes,1,rep,name=chunked_series,json=chunkedSeries,proto3" json:"chunked_series,omitempty"` // query_index represents an index of the query from ReadRequest.queries these chunks relates to. diff --git a/prompb/remote.proto b/prompb/remote.proto index da2b06f29..ecd8f0bb1 100644 --- a/prompb/remote.proto +++ b/prompb/remote.proto @@ -73,7 +73,7 @@ message QueryResult { // ChunkedReadResponse is a response when response_type equals STREAMED_XOR_CHUNKS. // We strictly stream full series after series, optionally split by time. This means that a single frame can contain // partition of the single series, but once a new series is started to be streamed it means that no more chunks will -// be sent for previous one. +// be sent for previous one. Series are returned sorted in the same way TSDB block are internally. message ChunkedReadResponse { repeated prometheus.ChunkedSeries chunked_series = 1; diff --git a/promql/engine.go b/promql/engine.go index 2f442acbe..96d66f977 100644 --- a/promql/engine.go +++ b/promql/engine.go @@ -596,7 +596,6 @@ func (ng *Engine) execEvalStmt(ctx context.Context, query *query, s *EvalStmt) ( return nil, warnings, err } - // TODO(fabxc): order ensured by storage? // TODO(fabxc): where to ensure metric labels are a copy from the storage internals. sortSpanTimer, _ := query.stats.GetSpanTimer(ctx, stats.ResultSortTime, ng.metrics.queryResultSort) sort.Sort(mat) diff --git a/promql/engine_test.go b/promql/engine_test.go index 0c20b6a6f..230cee482 100644 --- a/promql/engine_test.go +++ b/promql/engine_test.go @@ -19,6 +19,7 @@ import ( "io/ioutil" "os" "strings" + "sort" "testing" "time" @@ -179,6 +180,9 @@ type errQuerier struct { func (q *errQuerier) Select(*storage.SelectParams, ...*labels.Matcher) (storage.SeriesSet, storage.Warnings, error) { return errSeriesSet{err: q.err}, nil, q.err } +func (q *errQuerier) SelectSorted(*storage.SelectParams, ...*labels.Matcher) (storage.SeriesSet, storage.Warnings, error) { + return errSeriesSet{err: q.err}, nil, q.err +} func (*errQuerier) LabelValues(name string) ([]string, storage.Warnings, error) { return nil, nil, nil } func (*errQuerier) LabelNames() ([]string, storage.Warnings, error) { return nil, nil, nil } func (*errQuerier) Close() error { return nil } @@ -236,7 +240,10 @@ type paramCheckerQuerier struct { t *testing.T } -func (q *paramCheckerQuerier) Select(sp *storage.SelectParams, _ ...*labels.Matcher) (storage.SeriesSet, storage.Warnings, error) { +func (q *paramCheckerQuerier) Select(sp *storage.SelectParams, m ...*labels.Matcher) (storage.SeriesSet, storage.Warnings, error) { + return q.SelectSorted(sp, m...) +} +func (q *paramCheckerQuerier) SelectSorted(sp *storage.SelectParams, _ ...*labels.Matcher) (storage.SeriesSet, storage.Warnings, error) { testutil.Equals(q.t, q.start, sp.Start) testutil.Equals(q.t, q.end, sp.End) testutil.Equals(q.t, q.grouping, sp.Grouping) @@ -1111,7 +1118,9 @@ func TestSubquerySelector(t *testing.T) { res := qry.Exec(test.Context()) testutil.Equals(t, c.Result.Err, res.Err) - testutil.Equals(t, c.Result.Value, res.Value) + mat := res.Value.(Matrix) + sort.Sort(mat) + testutil.Equals(t, c.Result.Value, mat) } } } diff --git a/storage/fanout.go b/storage/fanout.go index 41ccff5a5..b4746a74c 100644 --- a/storage/fanout.go +++ b/storage/fanout.go @@ -229,10 +229,19 @@ func NewMergeQuerier(primaryQuerier Querier, queriers []Querier) Querier { // Select returns a set of series that matches the given label matchers. func (q *mergeQuerier) Select(params *SelectParams, matchers ...*labels.Matcher) (SeriesSet, Warnings, error) { + if len(q.queriers) != 1 { + // We need to sort for NewMergeSeriesSet to work. + return q.SelectSorted(params, matchers...) + } + return q.queriers[0].Select(params, matchers...) +} + +// SelectSorted returns a set of sorted series that matches the given label matchers. +func (q *mergeQuerier) SelectSorted(params *SelectParams, matchers ...*labels.Matcher) (SeriesSet, Warnings, error) { seriesSets := make([]SeriesSet, 0, len(q.queriers)) var warnings Warnings for _, querier := range q.queriers { - set, wrn, err := querier.Select(params, matchers...) + set, wrn, err := querier.SelectSorted(params, matchers...) q.setQuerierMap[set] = querier if wrn != nil { warnings = append(warnings, wrn...) diff --git a/storage/interface.go b/storage/interface.go index 6c2343daa..0ff3da99a 100644 --- a/storage/interface.go +++ b/storage/interface.go @@ -54,6 +54,9 @@ type Querier interface { // Select returns a set of series that matches the given label matchers. Select(*SelectParams, ...*labels.Matcher) (SeriesSet, Warnings, error) + // SelectSorted returns a sorted set of series that matches the given label matchers. + SelectSorted(*SelectParams, ...*labels.Matcher) (SeriesSet, Warnings, error) + // LabelValues returns all potential values for a label name. LabelValues(name string) ([]string, Warnings, error) diff --git a/storage/noop.go b/storage/noop.go index acdd79f4b..797006dfb 100644 --- a/storage/noop.go +++ b/storage/noop.go @@ -30,6 +30,10 @@ func (noopQuerier) Select(*SelectParams, ...*labels.Matcher) (SeriesSet, Warning return NoopSeriesSet(), nil, nil } +func (noopQuerier) SelectSorted(*SelectParams, ...*labels.Matcher) (SeriesSet, Warnings, error) { + return NoopSeriesSet(), nil, nil +} + func (noopQuerier) LabelValues(name string) ([]string, Warnings, error) { return nil, nil, nil } diff --git a/storage/remote/codec.go b/storage/remote/codec.go index 6321b6f45..10500f06a 100644 --- a/storage/remote/codec.go +++ b/storage/remote/codec.go @@ -144,7 +144,7 @@ func ToQueryResult(ss storage.SeriesSet, sampleLimit int) (*prompb.QueryResult, return resp, nil } -// FromQueryResult unpacks a QueryResult proto. +// FromQueryResult unpacks and sorts a QueryResult proto. func FromQueryResult(res *prompb.QueryResult) storage.SeriesSet { series := make([]storage.Series, 0, len(res.Timeseries)) for _, ts := range res.Timeseries { diff --git a/storage/remote/read.go b/storage/remote/read.go index 2cffdabfb..3e5c9573c 100644 --- a/storage/remote/read.go +++ b/storage/remote/read.go @@ -60,6 +60,12 @@ type querier struct { // Select implements storage.Querier and uses the given matchers to read series // sets from the Client. func (q *querier) Select(p *storage.SelectParams, matchers ...*labels.Matcher) (storage.SeriesSet, storage.Warnings, error) { + return q.SelectSorted(p, matchers...) +} + +// SelectSorted implements storage.Querier and uses the given matchers to read series +// sets from the Client. +func (q *querier) SelectSorted(p *storage.SelectParams, matchers ...*labels.Matcher) (storage.SeriesSet, storage.Warnings, error) { query, err := ToQuery(q.mint, q.maxt, matchers, p) if err != nil { return nil, nil, err @@ -74,6 +80,7 @@ func (q *querier) Select(p *storage.SelectParams, matchers ...*labels.Matcher) ( return nil, nil, fmt.Errorf("remote_read: %v", err) } + // FromQueryResult sorts. return FromQueryResult(res), nil, nil } diff --git a/storage/tsdb/tsdb.go b/storage/tsdb/tsdb.go index d982ef5cf..1fa355761 100644 --- a/storage/tsdb/tsdb.go +++ b/storage/tsdb/tsdb.go @@ -250,6 +250,14 @@ func (q querier) Select(_ *storage.SelectParams, ms ...*labels.Matcher) (storage return seriesSet{set: set}, nil, nil } +func (q querier) SelectSorted(_ *storage.SelectParams, ms ...*labels.Matcher) (storage.SeriesSet, storage.Warnings, error) { + set, err := q.q.SelectSorted(ms...) + if err != nil { + return nil, nil, err + } + return seriesSet{set: set}, nil, nil +} + func (q querier) LabelValues(name string) ([]string, storage.Warnings, error) { v, err := q.q.LabelValues(name) return v, nil, err diff --git a/tsdb/head_test.go b/tsdb/head_test.go index 3ad49c646..70a0f56e8 100644 --- a/tsdb/head_test.go +++ b/tsdb/head_test.go @@ -790,7 +790,7 @@ func TestDelete_e2e(t *testing.T) { q, err := NewBlockQuerier(hb, 0, 100000) testutil.Ok(t, err) defer q.Close() - ss, err := q.Select(del.ms...) + ss, err := q.SelectSorted(del.ms...) testutil.Ok(t, err) // Build the mockSeriesSet. matchedSeries := make([]Series, 0, len(matched)) diff --git a/tsdb/querier.go b/tsdb/querier.go index bd2fa9d13..b7323abbf 100644 --- a/tsdb/querier.go +++ b/tsdb/querier.go @@ -34,6 +34,9 @@ type Querier interface { // Select returns a set of series that matches the given label matchers. Select(...*labels.Matcher) (SeriesSet, error) + // SelectSorted returns a sorted set of series that matches the given label matcher. + SelectSorted(...*labels.Matcher) (SeriesSet, error) + // LabelValues returns all potential values for a label name. // It is not safe to use the strings beyond the lifefime of the querier. LabelValues(string) ([]string, error) @@ -106,14 +109,21 @@ func (q *querier) lvals(qs []Querier, n string) ([]string, error) { } func (q *querier) Select(ms ...*labels.Matcher) (SeriesSet, error) { + if len(q.blocks) != 1 { + return q.SelectSorted(ms...) + } + // Sorting Head series is slow, and unneeded when only the + // Head is being queried. Sorting blocks is a noop. + return q.blocks[0].Select(ms...) +} + +func (q *querier) SelectSorted(ms ...*labels.Matcher) (SeriesSet, error) { if len(q.blocks) == 0 { return EmptySeriesSet(), nil } ss := make([]SeriesSet, len(q.blocks)) - var s SeriesSet - var err error for i, b := range q.blocks { - s, err = b.Select(ms...) + s, err := b.SelectSorted(ms...) if err != nil { return nil, err } @@ -142,12 +152,16 @@ func (q *verticalQuerier) Select(ms ...*labels.Matcher) (SeriesSet, error) { return q.sel(q.blocks, ms) } +func (q *verticalQuerier) SelectSorted(ms ...*labels.Matcher) (SeriesSet, error) { + return q.sel(q.blocks, ms) +} + func (q *verticalQuerier) sel(qs []Querier, ms []*labels.Matcher) (SeriesSet, error) { if len(qs) == 0 { return EmptySeriesSet(), nil } if len(qs) == 1 { - return qs[0].Select(ms...) + return qs[0].SelectSorted(ms...) } l := len(qs) / 2 @@ -217,6 +231,24 @@ func (q *blockQuerier) Select(ms ...*labels.Matcher) (SeriesSet, error) { }, nil } +func (q *blockQuerier) SelectSorted(ms ...*labels.Matcher) (SeriesSet, error) { + base, err := LookupChunkSeriesSorted(q.index, q.tombstones, ms...) + if err != nil { + return nil, err + } + return &blockSeriesSet{ + set: &populatedChunkSeries{ + set: base, + chunks: q.chunks, + mint: q.mint, + maxt: q.maxt, + }, + + mint: q.mint, + maxt: q.maxt, + }, nil +} + func (q *blockQuerier) LabelValues(name string) ([]string, error) { return q.index.LabelValues(name) } @@ -299,7 +331,7 @@ func findSetMatches(pattern string) []string { } // PostingsForMatchers assembles a single postings iterator against the index reader -// based on the given matchers. +// based on the given matchers. The resulting postings are not ordered by series. func PostingsForMatchers(ix IndexReader, ms ...*labels.Matcher) (index.Postings, error) { var its, notIts []index.Postings // See which label must be non-empty. @@ -379,7 +411,7 @@ func PostingsForMatchers(ix IndexReader, ms ...*labels.Matcher) (index.Postings, it = index.Without(it, n) } - return ix.SortedPostings(it), nil + return it, nil } func postingsForMatcher(ix IndexReader, m *labels.Matcher) (index.Postings, error) { @@ -689,6 +721,16 @@ type baseChunkSeries struct { // LookupChunkSeries retrieves all series for the given matchers and returns a ChunkSeriesSet // over them. It drops chunks based on tombstones in the given reader. func LookupChunkSeries(ir IndexReader, tr tombstones.Reader, ms ...*labels.Matcher) (ChunkSeriesSet, error) { + return lookupChunkSeries(false, ir, tr, ms...) +} + +// LookupChunkSeries retrieves all series for the given matchers and returns a ChunkSeriesSet +// over them. It drops chunks based on tombstones in the given reader. Series will be in order. +func LookupChunkSeriesSorted(ir IndexReader, tr tombstones.Reader, ms ...*labels.Matcher) (ChunkSeriesSet, error) { + return lookupChunkSeries(true, ir, tr, ms...) +} + +func lookupChunkSeries(sorted bool, ir IndexReader, tr tombstones.Reader, ms ...*labels.Matcher) (ChunkSeriesSet, error) { if tr == nil { tr = tombstones.NewMemTombstones() } @@ -696,6 +738,9 @@ func LookupChunkSeries(ir IndexReader, tr tombstones.Reader, ms ...*labels.Match if err != nil { return nil, err } + if sorted { + p = ir.SortedPostings(p) + } return &baseChunkSeries{ p: p, index: ir, diff --git a/web/api/v1/api.go b/web/api/v1/api.go index 9bfee44b5..9918fefc2 100644 --- a/web/api/v1/api.go +++ b/web/api/v1/api.go @@ -1121,7 +1121,12 @@ func (api *API) remoteRead(w http.ResponseWriter, r *http.Request) { return } for i, query := range req.Queries { - err := api.remoteReadQuery(ctx, query, externalLabels, func(set storage.SeriesSet) error { + err := api.remoteReadQuery(ctx, query, externalLabels, func(querier storage.Querier, selectParams *storage.SelectParams, filteredMatchers []*labels.Matcher) error { + // The streaming API provides sorted series. + set, _, err := querier.SelectSorted(selectParams, filteredMatchers...) + if err != nil { + return err + } return remote.StreamChunkedReadResponses( remote.NewChunkedWriter(w, f), @@ -1149,7 +1154,11 @@ func (api *API) remoteRead(w http.ResponseWriter, r *http.Request) { Results: make([]*prompb.QueryResult, len(req.Queries)), } for i, query := range req.Queries { - err := api.remoteReadQuery(ctx, query, externalLabels, func(set storage.SeriesSet) error { + err := api.remoteReadQuery(ctx, query, externalLabels, func(querier storage.Querier, selectParams *storage.SelectParams, filteredMatchers []*labels.Matcher) error { + set, _, err := querier.Select(selectParams, filteredMatchers...) + if err != nil { + return err + } resp.Results[i], err = remote.ToQueryResult(set, api.remoteReadSampleLimit) if err != nil { @@ -1204,7 +1213,7 @@ func filterExtLabelsFromMatchers(pbMatchers []*prompb.LabelMatcher, externalLabe return filteredMatchers, nil } -func (api *API) remoteReadQuery(ctx context.Context, query *prompb.Query, externalLabels map[string]string, seriesHandleFn func(set storage.SeriesSet) error) error { +func (api *API) remoteReadQuery(ctx context.Context, query *prompb.Query, externalLabels map[string]string, seriesHandleFn func(querier storage.Querier, selectParams *storage.SelectParams, filteredMatchers []*labels.Matcher) error) error { filteredMatchers, err := filterExtLabelsFromMatchers(query.Matchers, externalLabels) if err != nil { return err @@ -1231,11 +1240,7 @@ func (api *API) remoteReadQuery(ctx context.Context, query *prompb.Query, extern } }() - set, _, err := querier.Select(selectParams, filteredMatchers...) - if err != nil { - return err - } - return seriesHandleFn(set) + return seriesHandleFn(querier, selectParams, filteredMatchers) } func (api *API) deleteSeries(r *http.Request) apiFuncResult {