mirror of https://github.com/prometheus/prometheus
Merge pull request #13777 from roidelapluie/remoteread2
Chunked remote read: close the querier earlierpull/13781/head
commit
d1abc3f255
|
@ -202,34 +202,16 @@ func (h *readHandler) remoteReadStreamedXORChunks(ctx context.Context, w http.Re
|
|||
return err
|
||||
}
|
||||
|
||||
querier, err := h.queryable.ChunkQuerier(query.StartTimestampMs, query.EndTimestampMs)
|
||||
if err != nil {
|
||||
chunks := h.getChunkSeriesSet(ctx, query, filteredMatchers)
|
||||
if err := chunks.Err(); err != nil {
|
||||
return err
|
||||
}
|
||||
defer func() {
|
||||
if err := querier.Close(); err != nil {
|
||||
level.Warn(h.logger).Log("msg", "Error on chunk querier close", "err", err.Error())
|
||||
}
|
||||
}()
|
||||
|
||||
var hints *storage.SelectHints
|
||||
if query.Hints != nil {
|
||||
hints = &storage.SelectHints{
|
||||
Start: query.Hints.StartMs,
|
||||
End: query.Hints.EndMs,
|
||||
Step: query.Hints.StepMs,
|
||||
Func: query.Hints.Func,
|
||||
Grouping: query.Hints.Grouping,
|
||||
Range: query.Hints.RangeMs,
|
||||
By: query.Hints.By,
|
||||
}
|
||||
}
|
||||
|
||||
ws, err := StreamChunkedReadResponses(
|
||||
NewChunkedWriter(w, f),
|
||||
int64(i),
|
||||
// The streaming API has to provide the series sorted.
|
||||
querier.Select(ctx, true, hints, filteredMatchers...),
|
||||
chunks,
|
||||
sortedExternalLabels,
|
||||
h.remoteReadMaxBytesInFrame,
|
||||
h.marshalPool,
|
||||
|
@ -254,6 +236,35 @@ func (h *readHandler) remoteReadStreamedXORChunks(ctx context.Context, w http.Re
|
|||
}
|
||||
}
|
||||
|
||||
// getChunkSeriesSet executes a query to retrieve a ChunkSeriesSet,
|
||||
// encapsulating the operation in its own function to ensure timely release of
|
||||
// the querier resources.
|
||||
func (h *readHandler) getChunkSeriesSet(ctx context.Context, query *prompb.Query, filteredMatchers []*labels.Matcher) storage.ChunkSeriesSet {
|
||||
querier, err := h.queryable.ChunkQuerier(query.StartTimestampMs, query.EndTimestampMs)
|
||||
if err != nil {
|
||||
return storage.ErrChunkSeriesSet(err)
|
||||
}
|
||||
defer func() {
|
||||
if err := querier.Close(); err != nil {
|
||||
level.Warn(h.logger).Log("msg", "Error on chunk querier close", "err", err.Error())
|
||||
}
|
||||
}()
|
||||
|
||||
var hints *storage.SelectHints
|
||||
if query.Hints != nil {
|
||||
hints = &storage.SelectHints{
|
||||
Start: query.Hints.StartMs,
|
||||
End: query.Hints.EndMs,
|
||||
Step: query.Hints.StepMs,
|
||||
Func: query.Hints.Func,
|
||||
Grouping: query.Hints.Grouping,
|
||||
Range: query.Hints.RangeMs,
|
||||
By: query.Hints.By,
|
||||
}
|
||||
}
|
||||
return querier.Select(ctx, true, hints, filteredMatchers...)
|
||||
}
|
||||
|
||||
// filterExtLabelsFromMatchers change equality matchers which match external labels
|
||||
// to a matcher that looks for an empty label,
|
||||
// as that label should not be present in the storage.
|
||||
|
|
Loading…
Reference in New Issue