mirror of https://github.com/prometheus/prometheus
You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
283 lines
8.1 KiB
283 lines
8.1 KiB
// Copyright 2017 The Prometheus Authors |
|
// Licensed under the Apache License, Version 2.0 (the "License"); |
|
// you may not use this file except in compliance with the License. |
|
// You may obtain a copy of the License at |
|
// |
|
// http://www.apache.org/licenses/LICENSE-2.0 |
|
// |
|
// Unless required by applicable law or agreed to in writing, software |
|
// distributed under the License is distributed on an "AS IS" BASIS, |
|
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
|
// See the License for the specific language governing permissions and |
|
// limitations under the License. |
|
|
|
package remote |
|
|
|
import ( |
|
"context" |
|
|
|
"github.com/pkg/errors" |
|
|
|
"github.com/prometheus/prometheus/model/labels" |
|
"github.com/prometheus/prometheus/storage" |
|
) |
|
|
|
type sampleAndChunkQueryableClient struct { |
|
client ReadClient |
|
externalLabels labels.Labels |
|
requiredMatchers []*labels.Matcher |
|
readRecent bool |
|
callback startTimeCallback |
|
} |
|
|
|
// NewSampleAndChunkQueryableClient returns a storage.SampleAndChunkQueryable which queries the given client to select series sets. |
|
func NewSampleAndChunkQueryableClient( |
|
c ReadClient, |
|
externalLabels labels.Labels, |
|
requiredMatchers []*labels.Matcher, |
|
readRecent bool, |
|
callback startTimeCallback, |
|
) storage.SampleAndChunkQueryable { |
|
return &sampleAndChunkQueryableClient{ |
|
client: c, |
|
|
|
externalLabels: externalLabels, |
|
requiredMatchers: requiredMatchers, |
|
readRecent: readRecent, |
|
callback: callback, |
|
} |
|
} |
|
|
|
func (c *sampleAndChunkQueryableClient) Querier(ctx context.Context, mint, maxt int64) (storage.Querier, error) { |
|
q := &querier{ |
|
ctx: ctx, |
|
mint: mint, |
|
maxt: maxt, |
|
client: c.client, |
|
externalLabels: c.externalLabels, |
|
requiredMatchers: c.requiredMatchers, |
|
} |
|
if c.readRecent { |
|
return q, nil |
|
} |
|
|
|
var ( |
|
noop bool |
|
err error |
|
) |
|
q.maxt, noop, err = c.preferLocalStorage(mint, maxt) |
|
if err != nil { |
|
return nil, err |
|
} |
|
if noop { |
|
return storage.NoopQuerier(), nil |
|
} |
|
return q, nil |
|
} |
|
|
|
func (c *sampleAndChunkQueryableClient) ChunkQuerier(ctx context.Context, mint, maxt int64) (storage.ChunkQuerier, error) { |
|
cq := &chunkQuerier{ |
|
querier: querier{ |
|
ctx: ctx, |
|
mint: mint, |
|
maxt: maxt, |
|
client: c.client, |
|
externalLabels: c.externalLabels, |
|
requiredMatchers: c.requiredMatchers, |
|
}, |
|
} |
|
if c.readRecent { |
|
return cq, nil |
|
} |
|
|
|
var ( |
|
noop bool |
|
err error |
|
) |
|
cq.querier.maxt, noop, err = c.preferLocalStorage(mint, maxt) |
|
if err != nil { |
|
return nil, err |
|
} |
|
if noop { |
|
return storage.NoopChunkedQuerier(), nil |
|
} |
|
return cq, nil |
|
} |
|
|
|
// preferLocalStorage returns noop if requested timeframe can be answered completely by the local TSDB, and |
|
// reduces maxt if the timeframe can be partially answered by TSDB. |
|
func (c *sampleAndChunkQueryableClient) preferLocalStorage(mint, maxt int64) (cmaxt int64, noop bool, err error) { |
|
localStartTime, err := c.callback() |
|
if err != nil { |
|
return 0, false, err |
|
} |
|
cmaxt = maxt |
|
|
|
// Avoid queries whose time range is later than the first timestamp in local DB. |
|
if mint > localStartTime { |
|
return 0, true, nil |
|
} |
|
// Query only samples older than the first timestamp in local DB. |
|
if maxt > localStartTime { |
|
cmaxt = localStartTime |
|
} |
|
return cmaxt, false, nil |
|
} |
|
|
|
type querier struct { |
|
ctx context.Context |
|
mint, maxt int64 |
|
client ReadClient |
|
|
|
// Derived from configuration. |
|
externalLabels labels.Labels |
|
requiredMatchers []*labels.Matcher |
|
} |
|
|
|
// Select implements storage.Querier and uses the given matchers to read series sets from the client. |
|
// Select also adds equality matchers for all external labels to the list of matchers before calling remote endpoint. |
|
// The added external labels are removed from the returned series sets. |
|
// |
|
// If requiredMatchers are given, select returns a NoopSeriesSet if the given matchers don't match the label set of the |
|
// requiredMatchers. Otherwise it'll just call remote endpoint. |
|
func (q *querier) Select(sortSeries bool, hints *storage.SelectHints, matchers ...*labels.Matcher) storage.SeriesSet { |
|
if len(q.requiredMatchers) > 0 { |
|
// Copy to not modify slice configured by user. |
|
requiredMatchers := append([]*labels.Matcher{}, q.requiredMatchers...) |
|
for _, m := range matchers { |
|
for i, r := range requiredMatchers { |
|
if m.Type == labels.MatchEqual && m.Name == r.Name && m.Value == r.Value { |
|
// Requirement matched. |
|
requiredMatchers = append(requiredMatchers[:i], requiredMatchers[i+1:]...) |
|
break |
|
} |
|
} |
|
if len(requiredMatchers) == 0 { |
|
break |
|
} |
|
} |
|
if len(requiredMatchers) > 0 { |
|
return storage.NoopSeriesSet() |
|
} |
|
} |
|
|
|
m, added := q.addExternalLabels(matchers) |
|
query, err := ToQuery(q.mint, q.maxt, m, hints) |
|
if err != nil { |
|
return storage.ErrSeriesSet(errors.Wrap(err, "toQuery")) |
|
} |
|
|
|
res, err := q.client.Read(q.ctx, query) |
|
if err != nil { |
|
return storage.ErrSeriesSet(errors.Wrap(err, "remote_read")) |
|
} |
|
return newSeriesSetFilter(FromQueryResult(sortSeries, res), added) |
|
} |
|
|
|
// addExternalLabels adds matchers for each external label. External labels |
|
// that already have a corresponding user-supplied matcher are skipped, as we |
|
// assume that the user explicitly wants to select a different value for them. |
|
// We return the new set of matchers, along with a map of labels for which |
|
// matchers were added, so that these can later be removed from the result |
|
// time series again. |
|
func (q querier) addExternalLabels(ms []*labels.Matcher) ([]*labels.Matcher, labels.Labels) { |
|
el := make(labels.Labels, len(q.externalLabels)) |
|
copy(el, q.externalLabels) |
|
|
|
// ms won't be sorted, so have to O(n^2) the search. |
|
for _, m := range ms { |
|
for i := 0; i < len(el); { |
|
if el[i].Name == m.Name { |
|
el = el[:i+copy(el[i:], el[i+1:])] |
|
continue |
|
} |
|
i++ |
|
} |
|
} |
|
|
|
for _, l := range el { |
|
m, err := labels.NewMatcher(labels.MatchEqual, l.Name, l.Value) |
|
if err != nil { |
|
panic(err) |
|
} |
|
ms = append(ms, m) |
|
} |
|
return ms, el |
|
} |
|
|
|
// LabelValues implements storage.Querier and is a noop. |
|
func (q *querier) LabelValues(string, ...*labels.Matcher) ([]string, storage.Warnings, error) { |
|
// TODO: Implement: https://github.com/prometheus/prometheus/issues/3351 |
|
return nil, nil, errors.New("not implemented") |
|
} |
|
|
|
// LabelNames implements storage.Querier and is a noop. |
|
func (q *querier) LabelNames(...*labels.Matcher) ([]string, storage.Warnings, error) { |
|
// TODO: Implement: https://github.com/prometheus/prometheus/issues/3351 |
|
return nil, nil, errors.New("not implemented") |
|
} |
|
|
|
// Close implements storage.Querier and is a noop. |
|
func (q *querier) Close() error { |
|
return nil |
|
} |
|
|
|
// chunkQuerier is an adapter to make a client usable as a storage.ChunkQuerier. |
|
type chunkQuerier struct { |
|
querier |
|
} |
|
|
|
// Select implements storage.ChunkQuerier and uses the given matchers to read chunk series sets from the client. |
|
// It uses remote.querier.Select so it supports external labels and required matchers if specified. |
|
func (q *chunkQuerier) Select(sortSeries bool, hints *storage.SelectHints, matchers ...*labels.Matcher) storage.ChunkSeriesSet { |
|
// TODO(bwplotka) Support remote read chunked and allow returning chunks directly (TODO ticket). |
|
return storage.NewSeriesSetToChunkSet(q.querier.Select(sortSeries, hints, matchers...)) |
|
} |
|
|
|
func newSeriesSetFilter(ss storage.SeriesSet, toFilter labels.Labels) storage.SeriesSet { |
|
return &seriesSetFilter{ |
|
SeriesSet: ss, |
|
toFilter: toFilter, |
|
} |
|
} |
|
|
|
type seriesSetFilter struct { |
|
storage.SeriesSet |
|
toFilter labels.Labels |
|
querier storage.Querier |
|
} |
|
|
|
func (ssf *seriesSetFilter) GetQuerier() storage.Querier { |
|
return ssf.querier |
|
} |
|
|
|
func (ssf *seriesSetFilter) SetQuerier(querier storage.Querier) { |
|
ssf.querier = querier |
|
} |
|
|
|
func (ssf seriesSetFilter) At() storage.Series { |
|
return seriesFilter{ |
|
Series: ssf.SeriesSet.At(), |
|
toFilter: ssf.toFilter, |
|
} |
|
} |
|
|
|
type seriesFilter struct { |
|
storage.Series |
|
toFilter labels.Labels |
|
} |
|
|
|
func (sf seriesFilter) Labels() labels.Labels { |
|
labels := sf.Series.Labels() |
|
for i, j := 0, 0; i < len(labels) && j < len(sf.toFilter); { |
|
if labels[i].Name < sf.toFilter[j].Name { |
|
i++ |
|
} else if labels[i].Name > sf.toFilter[j].Name { |
|
j++ |
|
} else { |
|
labels = labels[:i+copy(labels[i:], labels[i+1:])] |
|
j++ |
|
} |
|
} |
|
return labels |
|
}
|
|
|