Make AST query storage non-global.

pull/224/head
Julius Volz 2013-05-07 13:15:10 +02:00
parent 4872ec7577
commit 56324d8ce2
12 changed files with 63 additions and 69 deletions

View File

@ -131,9 +131,7 @@ func main() {
go ts.Serve() go ts.Serve()
go prometheus.interruptHandler() go prometheus.interruptHandler()
ast.SetStorage(*ts) ruleManager := rules.NewRuleManager(ruleResults, conf.EvaluationInterval(), ts)
ruleManager := rules.NewRuleManager(ruleResults, conf.EvaluationInterval())
err = ruleManager.AddRulesFromConfig(conf) err = ruleManager.AddRulesFromConfig(conf)
if err != nil { if err != nil {
log.Fatalf("Error loading rule files: %v", err) log.Fatalf("Error loading rule files: %v", err)

View File

@ -17,6 +17,7 @@ import (
"fmt" "fmt"
"github.com/prometheus/prometheus/model" "github.com/prometheus/prometheus/model"
"github.com/prometheus/prometheus/rules/ast" "github.com/prometheus/prometheus/rules/ast"
"github.com/prometheus/prometheus/storage/metric"
"github.com/prometheus/prometheus/utility" "github.com/prometheus/prometheus/utility"
"time" "time"
) )
@ -87,13 +88,13 @@ type AlertingRule struct {
func (rule AlertingRule) Name() string { return rule.name } func (rule AlertingRule) Name() string { return rule.name }
func (rule AlertingRule) EvalRaw(timestamp time.Time) (vector ast.Vector, err error) { func (rule AlertingRule) EvalRaw(timestamp time.Time, storage *metric.TieredStorage) (vector ast.Vector, err error) {
return ast.EvalVectorInstant(rule.vector, timestamp) return ast.EvalVectorInstant(rule.vector, timestamp, storage)
} }
func (rule AlertingRule) Eval(timestamp time.Time) (vector ast.Vector, err error) { func (rule AlertingRule) Eval(timestamp time.Time, storage *metric.TieredStorage) (vector ast.Vector, err error) {
// Get the raw value of the rule expression. // Get the raw value of the rule expression.
exprResult, err := rule.EvalRaw(timestamp) exprResult, err := rule.EvalRaw(timestamp, storage)
if err != nil { if err != nil {
return return
} }

View File

@ -17,6 +17,7 @@ import (
"errors" "errors"
"fmt" "fmt"
"github.com/prometheus/prometheus/model" "github.com/prometheus/prometheus/model"
"github.com/prometheus/prometheus/storage/metric"
"log" "log"
"math" "math"
"sort" "sort"
@ -258,8 +259,8 @@ func labelsToKey(labels model.Metric) string {
return strings.Join(keyParts, ",") // TODO not safe when label value contains comma. return strings.Join(keyParts, ",") // TODO not safe when label value contains comma.
} }
func EvalVectorInstant(node VectorNode, timestamp time.Time) (vector Vector, err error) { func EvalVectorInstant(node VectorNode, timestamp time.Time, storage *metric.TieredStorage) (vector Vector, err error) {
viewAdapter, err := viewAdapterForInstantQuery(node, timestamp) viewAdapter, err := viewAdapterForInstantQuery(node, timestamp, storage)
if err != nil { if err != nil {
return return
} }
@ -267,12 +268,12 @@ func EvalVectorInstant(node VectorNode, timestamp time.Time) (vector Vector, err
return return
} }
func EvalVectorRange(node VectorNode, start time.Time, end time.Time, interval time.Duration) (matrix Matrix, err error) { func EvalVectorRange(node VectorNode, start time.Time, end time.Time, interval time.Duration, storage *metric.TieredStorage) (matrix Matrix, err error) {
// Explicitly initialize to an empty matrix since a nil Matrix encodes to // Explicitly initialize to an empty matrix since a nil Matrix encodes to
// null in JSON. // null in JSON.
matrix = Matrix{} matrix = Matrix{}
viewAdapter, err := viewAdapterForRangeQuery(node, start, end, interval) viewAdapter, err := viewAdapterForRangeQuery(node, start, end, interval, storage)
if err != nil { if err != nil {
return return
} }

View File

@ -22,12 +22,6 @@ import (
var defaultStalenessDelta = flag.Int("defaultStalenessDelta", 300, "Default staleness delta allowance in seconds during expression evaluations.") var defaultStalenessDelta = flag.Int("defaultStalenessDelta", 300, "Default staleness delta allowance in seconds during expression evaluations.")
// AST-global storage to use for operations that are not supported by views
// (i.e. metric->fingerprint lookups).
//
// BUG(julius): Wrap this into non-global state.
var queryStorage *metric.TieredStorage
// Describes the lenience limits to apply to values from the materialized view. // Describes the lenience limits to apply to values from the materialized view.
type StalenessPolicy struct { type StalenessPolicy struct {
// Describes the inclusive limit at which individual points if requested will // Describes the inclusive limit at which individual points if requested will
@ -36,8 +30,15 @@ type StalenessPolicy struct {
} }
type viewAdapter struct { type viewAdapter struct {
view metric.View // Policy that dictates when sample values around an evaluation time are to
// be interpreted as stale.
stalenessPolicy StalenessPolicy stalenessPolicy StalenessPolicy
// AST-global storage to use for operations that are not supported by views
// (i.e. fingerprint->metric lookups).
storage *metric.TieredStorage
// The materialized view which contains all timeseries data required for
// executing a query.
view metric.View
} }
// interpolateSamples interpolates a value at a target time between two // interpolateSamples interpolates a value at a target time between two
@ -109,7 +110,7 @@ func (v *viewAdapter) GetValueAtTime(fingerprints model.Fingerprints, timestamp
for _, fingerprint := range fingerprints { for _, fingerprint := range fingerprints {
sampleCandidates := v.view.GetValueAtTime(fingerprint, timestamp) sampleCandidates := v.view.GetValueAtTime(fingerprint, timestamp)
samplePair := v.chooseClosestSample(sampleCandidates, timestamp) samplePair := v.chooseClosestSample(sampleCandidates, timestamp)
m, err := queryStorage.GetMetricForFingerprint(fingerprint) m, err := v.storage.GetMetricForFingerprint(fingerprint)
if err != nil { if err != nil {
continue continue
} }
@ -133,7 +134,7 @@ func (v *viewAdapter) GetBoundaryValues(fingerprints model.Fingerprints, interva
} }
// TODO: memoize/cache this. // TODO: memoize/cache this.
m, err := queryStorage.GetMetricForFingerprint(fingerprint) m, err := v.storage.GetMetricForFingerprint(fingerprint)
if err != nil { if err != nil {
continue continue
} }
@ -155,7 +156,7 @@ func (v *viewAdapter) GetRangeValues(fingerprints model.Fingerprints, interval *
} }
// TODO: memoize/cache this. // TODO: memoize/cache this.
m, err := queryStorage.GetMetricForFingerprint(fingerprint) m, err := v.storage.GetMetricForFingerprint(fingerprint)
if err != nil { if err != nil {
continue continue
} }
@ -169,17 +170,14 @@ func (v *viewAdapter) GetRangeValues(fingerprints model.Fingerprints, interval *
return sampleSets, nil return sampleSets, nil
} }
func SetStorage(storage metric.TieredStorage) { func NewViewAdapter(view metric.View, storage *metric.TieredStorage) *viewAdapter {
queryStorage = &storage
}
func NewViewAdapter(view metric.View) *viewAdapter {
stalenessPolicy := StalenessPolicy{ stalenessPolicy := StalenessPolicy{
DeltaAllowance: time.Duration(*defaultStalenessDelta) * time.Second, DeltaAllowance: time.Duration(*defaultStalenessDelta) * time.Second,
} }
return &viewAdapter{ return &viewAdapter{
view: view,
stalenessPolicy: stalenessPolicy, stalenessPolicy: stalenessPolicy,
storage: storage,
view: view,
} }
} }

View File

@ -17,6 +17,7 @@ import (
"encoding/json" "encoding/json"
"fmt" "fmt"
"github.com/prometheus/prometheus/model" "github.com/prometheus/prometheus/model"
"github.com/prometheus/prometheus/storage/metric"
"github.com/prometheus/prometheus/utility" "github.com/prometheus/prometheus/utility"
"sort" "sort"
"strings" "strings"
@ -151,8 +152,8 @@ func TypedValueToJSON(data interface{}, typeStr string) string {
return string(dataJSON) return string(dataJSON)
} }
func EvalToString(node Node, timestamp time.Time, format OutputFormat) string { func EvalToString(node Node, timestamp time.Time, format OutputFormat, storage *metric.TieredStorage) string {
viewAdapter, err := viewAdapterForInstantQuery(node, timestamp) viewAdapter, err := viewAdapterForInstantQuery(node, timestamp, storage)
if err != nil { if err != nil {
panic(err) panic(err)
} }

View File

@ -35,33 +35,23 @@ type QueryAnalyzer struct {
FullRanges FullRangeMap FullRanges FullRangeMap
// Interval ranges always implicitly span the whole query interval. // Interval ranges always implicitly span the whole query interval.
IntervalRanges IntervalRangeMap IntervalRanges IntervalRangeMap
// The underlying storage to which the query will be applied. Needed for
// extracting timeseries fingerprint information during query analysis.
storage *metric.TieredStorage
} }
func NewQueryAnalyzer() *QueryAnalyzer { func NewQueryAnalyzer(storage *metric.TieredStorage) *QueryAnalyzer {
return &QueryAnalyzer{ return &QueryAnalyzer{
FullRanges: FullRangeMap{}, FullRanges: FullRangeMap{},
IntervalRanges: IntervalRangeMap{}, IntervalRanges: IntervalRangeMap{},
storage: storage,
} }
} }
func minTime(t1, t2 time.Time) time.Time {
if t1.Before(t2) {
return t1
}
return t2
}
func maxTime(t1, t2 time.Time) time.Time {
if t1.After(t2) {
return t1
}
return t2
}
func (analyzer *QueryAnalyzer) Visit(node Node) { func (analyzer *QueryAnalyzer) Visit(node Node) {
switch n := node.(type) { switch n := node.(type) {
case *VectorLiteral: case *VectorLiteral:
fingerprints, err := queryStorage.GetFingerprintsForLabelSet(n.labels) fingerprints, err := analyzer.storage.GetFingerprintsForLabelSet(n.labels)
if err != nil { if err != nil {
log.Printf("Error getting fingerprints for labelset %v: %v", n.labels, err) log.Printf("Error getting fingerprints for labelset %v: %v", n.labels, err)
return return
@ -73,7 +63,7 @@ func (analyzer *QueryAnalyzer) Visit(node Node) {
} }
} }
case *MatrixLiteral: case *MatrixLiteral:
fingerprints, err := queryStorage.GetFingerprintsForLabelSet(n.labels) fingerprints, err := analyzer.storage.GetFingerprintsForLabelSet(n.labels)
if err != nil { if err != nil {
log.Printf("Error getting fingerprints for labelset %v: %v", n.labels, err) log.Printf("Error getting fingerprints for labelset %v: %v", n.labels, err)
return return
@ -105,8 +95,8 @@ func (analyzer *QueryAnalyzer) AnalyzeQueries(node Node) {
} }
} }
func viewAdapterForInstantQuery(node Node, timestamp time.Time) (viewAdapter *viewAdapter, err error) { func viewAdapterForInstantQuery(node Node, timestamp time.Time, storage *metric.TieredStorage) (viewAdapter *viewAdapter, err error) {
analyzer := NewQueryAnalyzer() analyzer := NewQueryAnalyzer(storage)
analyzer.AnalyzeQueries(node) analyzer.AnalyzeQueries(node)
viewBuilder := metric.NewViewRequestBuilder() viewBuilder := metric.NewViewRequestBuilder()
for fingerprint, rangeDuration := range analyzer.FullRanges { for fingerprint, rangeDuration := range analyzer.FullRanges {
@ -115,15 +105,15 @@ func viewAdapterForInstantQuery(node Node, timestamp time.Time) (viewAdapter *vi
for fingerprint := range analyzer.IntervalRanges { for fingerprint := range analyzer.IntervalRanges {
viewBuilder.GetMetricAtTime(fingerprint, timestamp) viewBuilder.GetMetricAtTime(fingerprint, timestamp)
} }
view, err := queryStorage.MakeView(viewBuilder, time.Duration(60)*time.Second) view, err := analyzer.storage.MakeView(viewBuilder, time.Duration(60)*time.Second)
if err == nil { if err == nil {
viewAdapter = NewViewAdapter(view) viewAdapter = NewViewAdapter(view, storage)
} }
return return
} }
func viewAdapterForRangeQuery(node Node, start time.Time, end time.Time, interval time.Duration) (viewAdapter *viewAdapter, err error) { func viewAdapterForRangeQuery(node Node, start time.Time, end time.Time, interval time.Duration, storage *metric.TieredStorage) (viewAdapter *viewAdapter, err error) {
analyzer := NewQueryAnalyzer() analyzer := NewQueryAnalyzer(storage)
analyzer.AnalyzeQueries(node) analyzer.AnalyzeQueries(node)
viewBuilder := metric.NewViewRequestBuilder() viewBuilder := metric.NewViewRequestBuilder()
for fingerprint, rangeDuration := range analyzer.FullRanges { for fingerprint, rangeDuration := range analyzer.FullRanges {
@ -135,9 +125,9 @@ func viewAdapterForRangeQuery(node Node, start time.Time, end time.Time, interva
for fingerprint := range analyzer.IntervalRanges { for fingerprint := range analyzer.IntervalRanges {
viewBuilder.GetMetricAtInterval(fingerprint, start, end, interval) viewBuilder.GetMetricAtInterval(fingerprint, start, end, interval)
} }
view, err := queryStorage.MakeView(viewBuilder, time.Duration(60)*time.Second) view, err := analyzer.storage.MakeView(viewBuilder, time.Duration(60)*time.Second)
if err == nil { if err == nil {
viewAdapter = NewViewAdapter(view) viewAdapter = NewViewAdapter(view, storage)
} }
return return
} }

View File

@ -16,6 +16,7 @@ package rules
import ( import (
"github.com/prometheus/prometheus/config" "github.com/prometheus/prometheus/config"
"github.com/prometheus/prometheus/model" "github.com/prometheus/prometheus/model"
"github.com/prometheus/prometheus/storage/metric"
"log" "log"
"sync" "sync"
"time" "time"
@ -35,14 +36,16 @@ type ruleManager struct {
results chan *Result results chan *Result
done chan bool done chan bool
interval time.Duration interval time.Duration
storage *metric.TieredStorage
} }
func NewRuleManager(results chan *Result, interval time.Duration) RuleManager { func NewRuleManager(results chan *Result, interval time.Duration, storage *metric.TieredStorage) RuleManager {
manager := &ruleManager{ manager := &ruleManager{
results: results, results: results,
rules: []Rule{}, rules: []Rule{},
done: make(chan bool), done: make(chan bool),
interval: interval, interval: interval,
storage: storage,
} }
go manager.run(results) go manager.run(results)
return manager return manager
@ -72,7 +75,7 @@ func (m *ruleManager) runIteration(results chan *Result) {
for _, rule := range m.rules { for _, rule := range m.rules {
wg.Add(1) wg.Add(1)
go func(rule Rule) { go func(rule Rule) {
vector, err := rule.Eval(now) vector, err := rule.Eval(now, m.storage)
samples := model.Samples{} samples := model.Samples{}
for _, sample := range vector { for _, sample := range vector {
samples = append(samples, sample) samples = append(samples, sample)

View File

@ -17,6 +17,7 @@ import (
"fmt" "fmt"
"github.com/prometheus/prometheus/model" "github.com/prometheus/prometheus/model"
"github.com/prometheus/prometheus/rules/ast" "github.com/prometheus/prometheus/rules/ast"
"github.com/prometheus/prometheus/storage/metric"
"time" "time"
) )
@ -30,13 +31,13 @@ type RecordingRule struct {
func (rule RecordingRule) Name() string { return rule.name } func (rule RecordingRule) Name() string { return rule.name }
func (rule RecordingRule) EvalRaw(timestamp time.Time) (vector ast.Vector, err error) { func (rule RecordingRule) EvalRaw(timestamp time.Time, storage *metric.TieredStorage) (vector ast.Vector, err error) {
return ast.EvalVectorInstant(rule.vector, timestamp) return ast.EvalVectorInstant(rule.vector, timestamp, storage)
} }
func (rule RecordingRule) Eval(timestamp time.Time) (vector ast.Vector, err error) { func (rule RecordingRule) Eval(timestamp time.Time, storage *metric.TieredStorage) (vector ast.Vector, err error) {
// Get the raw value of the rule expression. // Get the raw value of the rule expression.
vector, err = rule.EvalRaw(timestamp) vector, err = rule.EvalRaw(timestamp, storage)
if err != nil { if err != nil {
return return
} }

View File

@ -15,6 +15,7 @@ package rules
import ( import (
"github.com/prometheus/prometheus/rules/ast" "github.com/prometheus/prometheus/rules/ast"
"github.com/prometheus/prometheus/storage/metric"
"time" "time"
) )
@ -25,9 +26,9 @@ type Rule interface {
Name() string Name() string
// EvalRaw evaluates the rule's vector expression without triggering any // EvalRaw evaluates the rule's vector expression without triggering any
// other actions, like recording or alerting. // other actions, like recording or alerting.
EvalRaw(timestamp time.Time) (vector ast.Vector, err error) EvalRaw(timestamp time.Time, storage *metric.TieredStorage) (vector ast.Vector, err error)
// Eval evaluates the rule, including any associated recording or alerting actions. // Eval evaluates the rule, including any associated recording or alerting actions.
Eval(timestamp time.Time) (vector ast.Vector, err error) Eval(timestamp time.Time, storage *metric.TieredStorage) (vector ast.Vector, err error)
// ToDotGraph returns a Graphviz dot graph of the rule. // ToDotGraph returns a Graphviz dot graph of the rule.
ToDotGraph() string ToDotGraph() string
} }

View File

@ -54,7 +54,6 @@ func newTestStorage(t test.Tester) (storage *metric.TieredStorage, closer test.C
if storage == nil { if storage == nil {
t.Fatal("storage == nil") t.Fatal("storage == nil")
} }
ast.SetStorage(*storage)
storeMatrix(*storage, testMatrix) storeMatrix(*storage, testMatrix)
return return
} }
@ -308,7 +307,7 @@ func ExpressionTests(t *testing.T) {
t.Errorf("%d. Test should fail, but didn't", i) t.Errorf("%d. Test should fail, but didn't", i)
} }
failed := false failed := false
resultStr := ast.EvalToString(testExpr, testEvalTime, ast.TEXT) resultStr := ast.EvalToString(testExpr, testEvalTime, ast.TEXT, tieredStorage)
resultLines := strings.Split(resultStr, "\n") resultLines := strings.Split(resultStr, "\n")
if len(exprTest.output) != len(resultLines) { if len(exprTest.output) != len(resultLines) {
@ -338,7 +337,7 @@ func ExpressionTests(t *testing.T) {
} }
} }
analyzer := ast.NewQueryAnalyzer() analyzer := ast.NewQueryAnalyzer(tieredStorage)
analyzer.AnalyzeQueries(testExpr) analyzer.AnalyzeQueries(testExpr)
if exprTest.fullRanges != len(analyzer.FullRanges) { if exprTest.fullRanges != len(analyzer.FullRanges) {
t.Errorf("%d. Count of full ranges didn't match: %v vs %v", i, exprTest.fullRanges, len(analyzer.FullRanges)) t.Errorf("%d. Count of full ranges didn't match: %v vs %v", i, exprTest.fullRanges, len(analyzer.FullRanges))
@ -464,7 +463,7 @@ func TestAlertingRule(t *testing.T) {
for i, expected := range evalOutputs { for i, expected := range evalOutputs {
evalTime := testStartTime.Add(testSampleInterval * time.Duration(i)) evalTime := testStartTime.Add(testSampleInterval * time.Duration(i))
actual, err := rule.Eval(evalTime) actual, err := rule.Eval(evalTime, tieredStorage)
if err != nil { if err != nil {
t.Fatalf("Error during alerting rule evaluation: %s", err) t.Fatalf("Error during alerting rule evaluation: %s", err)
} }

View File

@ -52,7 +52,7 @@ func (serv MetricsService) Query(expr string, formatJson string) (result string)
rb.SetContentType(gorest.Text_Plain) rb.SetContentType(gorest.Text_Plain)
} }
return ast.EvalToString(exprNode, timestamp, format) return ast.EvalToString(exprNode, timestamp, format, serv.Storage)
} }
func (serv MetricsService) QueryRange(expr string, end int64, duration int64, step int64) string { func (serv MetricsService) QueryRange(expr string, end int64, duration int64, step int64) string {
@ -86,7 +86,8 @@ func (serv MetricsService) QueryRange(expr string, end int64, duration int64, st
exprNode.(ast.VectorNode), exprNode.(ast.VectorNode),
time.Unix(end-duration, 0).UTC(), time.Unix(end-duration, 0).UTC(),
time.Unix(end, 0).UTC(), time.Unix(end, 0).UTC(),
time.Duration(step)*time.Second) time.Duration(step)*time.Second,
serv.Storage)
if err != nil { if err != nil {
return ast.ErrorToJSON(err) return ast.ErrorToJSON(err)
} }