mirror of https://github.com/prometheus/prometheus
Revert "Chunked remote read: close the querier earlier" Signed-off-by: Bryan Boreham <bjboreham@gmail.com>pull/14645/head
parent
14cfec3f60
commit
72d84f511f
|
@ -202,16 +202,34 @@ func (h *readHandler) remoteReadStreamedXORChunks(ctx context.Context, w http.Re
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
chunks := h.getChunkSeriesSet(ctx, query, filteredMatchers)
|
querier, err := h.queryable.ChunkQuerier(query.StartTimestampMs, query.EndTimestampMs)
|
||||||
if err := chunks.Err(); err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
defer func() {
|
||||||
|
if err := querier.Close(); err != nil {
|
||||||
|
level.Warn(h.logger).Log("msg", "Error on chunk querier close", "err", err.Error())
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
|
||||||
|
var hints *storage.SelectHints
|
||||||
|
if query.Hints != nil {
|
||||||
|
hints = &storage.SelectHints{
|
||||||
|
Start: query.Hints.StartMs,
|
||||||
|
End: query.Hints.EndMs,
|
||||||
|
Step: query.Hints.StepMs,
|
||||||
|
Func: query.Hints.Func,
|
||||||
|
Grouping: query.Hints.Grouping,
|
||||||
|
Range: query.Hints.RangeMs,
|
||||||
|
By: query.Hints.By,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
ws, err := StreamChunkedReadResponses(
|
ws, err := StreamChunkedReadResponses(
|
||||||
NewChunkedWriter(w, f),
|
NewChunkedWriter(w, f),
|
||||||
int64(i),
|
int64(i),
|
||||||
// The streaming API has to provide the series sorted.
|
// The streaming API has to provide the series sorted.
|
||||||
chunks,
|
querier.Select(ctx, true, hints, filteredMatchers...),
|
||||||
sortedExternalLabels,
|
sortedExternalLabels,
|
||||||
h.remoteReadMaxBytesInFrame,
|
h.remoteReadMaxBytesInFrame,
|
||||||
h.marshalPool,
|
h.marshalPool,
|
||||||
|
@ -236,35 +254,6 @@ func (h *readHandler) remoteReadStreamedXORChunks(ctx context.Context, w http.Re
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// getChunkSeriesSet executes a query to retrieve a ChunkSeriesSet,
|
|
||||||
// encapsulating the operation in its own function to ensure timely release of
|
|
||||||
// the querier resources.
|
|
||||||
func (h *readHandler) getChunkSeriesSet(ctx context.Context, query *prompb.Query, filteredMatchers []*labels.Matcher) storage.ChunkSeriesSet {
|
|
||||||
querier, err := h.queryable.ChunkQuerier(query.StartTimestampMs, query.EndTimestampMs)
|
|
||||||
if err != nil {
|
|
||||||
return storage.ErrChunkSeriesSet(err)
|
|
||||||
}
|
|
||||||
defer func() {
|
|
||||||
if err := querier.Close(); err != nil {
|
|
||||||
level.Warn(h.logger).Log("msg", "Error on chunk querier close", "err", err.Error())
|
|
||||||
}
|
|
||||||
}()
|
|
||||||
|
|
||||||
var hints *storage.SelectHints
|
|
||||||
if query.Hints != nil {
|
|
||||||
hints = &storage.SelectHints{
|
|
||||||
Start: query.Hints.StartMs,
|
|
||||||
End: query.Hints.EndMs,
|
|
||||||
Step: query.Hints.StepMs,
|
|
||||||
Func: query.Hints.Func,
|
|
||||||
Grouping: query.Hints.Grouping,
|
|
||||||
Range: query.Hints.RangeMs,
|
|
||||||
By: query.Hints.By,
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return querier.Select(ctx, true, hints, filteredMatchers...)
|
|
||||||
}
|
|
||||||
|
|
||||||
// filterExtLabelsFromMatchers change equality matchers which match external labels
|
// filterExtLabelsFromMatchers change equality matchers which match external labels
|
||||||
// to a matcher that looks for an empty label,
|
// to a matcher that looks for an empty label,
|
||||||
// as that label should not be present in the storage.
|
// as that label should not be present in the storage.
|
||||||
|
|
Loading…
Reference in New Issue