mirror of https://github.com/prometheus/prometheus
Make rules only read local data
parent
94acd3f1d8
commit
8fda83ea12
|
@ -114,7 +114,7 @@ func Main() int {
|
|||
SampleAppender: sampleAppender,
|
||||
Notifier: notifier,
|
||||
QueryEngine: queryEngine,
|
||||
Context: ctx,
|
||||
Context: fanin.WithLocalOnly(ctx),
|
||||
ExternalURL: cfg.web.ExternalURL,
|
||||
})
|
||||
|
||||
|
|
|
@ -26,6 +26,20 @@ import (
|
|||
"github.com/prometheus/prometheus/storage/remote"
|
||||
)
|
||||
|
||||
type contextKey string
|
||||
|
||||
const ctxLocalOnly contextKey = "local-only"
|
||||
|
||||
// WithLocalOnly decorates a context to indicate that a query should
|
||||
// only be executed against local data.
|
||||
func WithLocalOnly(ctx context.Context) context.Context {
|
||||
return context.WithValue(ctx, ctxLocalOnly, struct{}{})
|
||||
}
|
||||
|
||||
func localOnly(ctx context.Context) bool {
|
||||
return ctx.Value(ctxLocalOnly) == struct{}{}
|
||||
}
|
||||
|
||||
// Queryable is a local.Queryable that reads from local and remote storage.
|
||||
type Queryable struct {
|
||||
Local promql.Queryable
|
||||
|
@ -52,25 +66,24 @@ type querier struct {
|
|||
}
|
||||
|
||||
func (q querier) QueryRange(ctx context.Context, from, through model.Time, matchers ...*metric.LabelMatcher) ([]local.SeriesIterator, error) {
|
||||
return q.query(func(q local.Querier) ([]local.SeriesIterator, error) {
|
||||
return q.query(ctx, func(q local.Querier) ([]local.SeriesIterator, error) {
|
||||
return q.QueryRange(ctx, from, through, matchers...)
|
||||
})
|
||||
}
|
||||
|
||||
func (q querier) QueryInstant(ctx context.Context, ts model.Time, stalenessDelta time.Duration, matchers ...*metric.LabelMatcher) ([]local.SeriesIterator, error) {
|
||||
return q.query(func(q local.Querier) ([]local.SeriesIterator, error) {
|
||||
return q.query(ctx, func(q local.Querier) ([]local.SeriesIterator, error) {
|
||||
return q.QueryInstant(ctx, ts, stalenessDelta, matchers...)
|
||||
})
|
||||
}
|
||||
|
||||
func (q querier) query(qFn func(q local.Querier) ([]local.SeriesIterator, error)) ([]local.SeriesIterator, error) {
|
||||
func (q querier) query(ctx context.Context, qFn func(q local.Querier) ([]local.SeriesIterator, error)) ([]local.SeriesIterator, error) {
|
||||
localIts, err := qFn(q.local)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
if len(q.remotes) == 0 {
|
||||
// Skip merge logic if there are no remote queriers.
|
||||
if len(q.remotes) == 0 || localOnly(ctx) {
|
||||
return localIts, nil
|
||||
}
|
||||
|
||||
|
|
|
@ -111,9 +111,10 @@ func (q testQuerier) Close() error {
|
|||
|
||||
func TestQueryRange(t *testing.T) {
|
||||
type query struct {
|
||||
from model.Time
|
||||
through model.Time
|
||||
out model.Matrix
|
||||
from model.Time
|
||||
through model.Time
|
||||
out model.Matrix
|
||||
localOnly bool
|
||||
}
|
||||
|
||||
tests := []struct {
|
||||
|
@ -485,6 +486,105 @@ func TestQueryRange(t *testing.T) {
|
|||
},
|
||||
},
|
||||
},
|
||||
|
||||
{
|
||||
name: "context value to indicate only local querying is set",
|
||||
local: model.Matrix{
|
||||
&model.SampleStream{
|
||||
Metric: model.Metric{
|
||||
model.MetricNameLabel: "testmetric",
|
||||
},
|
||||
Values: []model.SamplePair{
|
||||
{
|
||||
Timestamp: 2,
|
||||
Value: 2,
|
||||
},
|
||||
{
|
||||
Timestamp: 3,
|
||||
Value: 3,
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
remote: []model.Matrix{
|
||||
model.Matrix{
|
||||
&model.SampleStream{
|
||||
Metric: model.Metric{
|
||||
model.MetricNameLabel: "testmetric",
|
||||
},
|
||||
Values: []model.SamplePair{
|
||||
{
|
||||
Timestamp: 0,
|
||||
Value: 0,
|
||||
},
|
||||
{
|
||||
Timestamp: 1,
|
||||
Value: 1,
|
||||
},
|
||||
{
|
||||
Timestamp: 2,
|
||||
Value: 2,
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
queries: []query{
|
||||
{
|
||||
from: 0,
|
||||
through: 3,
|
||||
localOnly: true,
|
||||
out: model.Matrix{
|
||||
&model.SampleStream{
|
||||
Metric: model.Metric{
|
||||
model.MetricNameLabel: "testmetric",
|
||||
},
|
||||
Values: []model.SamplePair{
|
||||
{
|
||||
Timestamp: 2,
|
||||
Value: 2,
|
||||
},
|
||||
{
|
||||
Timestamp: 3,
|
||||
Value: 3,
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
{
|
||||
from: 1,
|
||||
through: 1,
|
||||
localOnly: true,
|
||||
out: model.Matrix{
|
||||
&model.SampleStream{
|
||||
Metric: model.Metric{
|
||||
model.MetricNameLabel: "testmetric",
|
||||
},
|
||||
Values: []model.SamplePair{model.ZeroSamplePair},
|
||||
},
|
||||
},
|
||||
},
|
||||
{
|
||||
from: 2,
|
||||
through: 2,
|
||||
localOnly: true,
|
||||
out: model.Matrix{
|
||||
&model.SampleStream{
|
||||
Metric: model.Metric{
|
||||
model.MetricNameLabel: "testmetric",
|
||||
},
|
||||
Values: []model.SamplePair{
|
||||
{
|
||||
Timestamp: 2,
|
||||
Value: 2,
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
matcher, err := metric.NewLabelMatcher(metric.Equal, model.MetricNameLabel, "testmetric")
|
||||
|
@ -501,12 +601,16 @@ func TestQueryRange(t *testing.T) {
|
|||
}
|
||||
|
||||
for i, query := range test.queries {
|
||||
ctx := context.Background()
|
||||
if query.localOnly {
|
||||
ctx = WithLocalOnly(ctx)
|
||||
}
|
||||
var its []local.SeriesIterator
|
||||
var err error
|
||||
if query.from == query.through {
|
||||
its, err = q.QueryInstant(context.Background(), query.from, 5*time.Minute, matcher)
|
||||
its, err = q.QueryInstant(ctx, query.from, 5*time.Minute, matcher)
|
||||
} else {
|
||||
its, err = q.QueryRange(context.Background(), query.from, query.through, matcher)
|
||||
its, err = q.QueryRange(ctx, query.from, query.through, matcher)
|
||||
}
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
|
|
Loading…
Reference in New Issue