diff --git a/main.go b/main.go index bc092239d..4ac6ecb7a 100644 --- a/main.go +++ b/main.go @@ -167,16 +167,9 @@ func NewPrometheus() *prometheus { PrometheusURL: web.MustBuildServerURL(*pathPrefix), PathPrefix: *pathPrefix, }) - for _, rf := range conf.Global.GetRuleFile() { - query, err := queryEngine.NewQueryFromFile(rf) - if err != nil { - glog.Errorf("Error loading rule file %q: %s", rf, err) - os.Exit(1) - } - if res := query.Exec(); res.Err != nil { - glog.Errorf("Error initializing rules: %s", res.Err) - os.Exit(1) - } + if err := ruleManager.LoadRuleFiles(conf.Global.GetRuleFile()...); err != nil { + glog.Errorf("Error loading rule files: %s", err) + os.Exit(1) } flags := map[string]string{} diff --git a/promql/engine.go b/promql/engine.go index a149de0ad..945eca6c7 100644 --- a/promql/engine.go +++ b/promql/engine.go @@ -16,11 +16,9 @@ package promql import ( "flag" "fmt" - "io/ioutil" "math" "runtime" "sort" - "sync" "time" "golang.org/x/net/context" @@ -165,14 +163,10 @@ type ( ErrQueryTimeout string // ErrQueryCanceled is returned if a query was canceled during processing. ErrQueryCanceled string - // ErrNoHandlers is returned if no handlers were registered for the - // execution of a statement. - ErrNoHandlers string ) func (e ErrQueryTimeout) Error() string { return fmt.Sprintf("query timed out in %s", e) } func (e ErrQueryCanceled) Error() string { return fmt.Sprintf("query was canceled in %s", e) } -func (e ErrNoHandlers) Error() string { return fmt.Sprintf("no handlers registered to process %s", e) } // A Query is derived from an a raw query string and can be run against an engine // it is associated with. @@ -193,9 +187,6 @@ type query struct { q string // Statements of the parsed query. stmts Statements - // On finished execution two bools indicating success of the execution - // are sent on the channel. - done chan bool // Timer stats for the query execution. stats *stats.TimerGroup // Cancelation function for the query. @@ -231,15 +222,6 @@ func (q *query) Exec() *Result { return &Result{Err: err, Value: res} } -type ( - // AlertHandlers can be registered with an engine and are called on - // each executed alert statement. - AlertHandler func(context.Context, *AlertStmt) error - // RecordHandlers can be registered with an engine and are called on - // each executed record statement. - RecordHandler func(context.Context, *RecordStmt) error -) - // contextDone returns an error if the context was canceled or timed out. func contextDone(ctx context.Context, env string) error { select { @@ -258,32 +240,24 @@ func contextDone(ctx context.Context, env string) error { } } -// Engine handles the liftetime of queries from beginning to end. It is connected -// to a storage. +// Engine handles the liftetime of queries from beginning to end. +// It is connected to a storage. type Engine struct { - sync.RWMutex - // The storage on which the engine operates. storage local.Storage // The base context for all queries and its cancellation function. baseCtx context.Context cancelQueries func() - - // Handlers for the statements. - alertHandlers map[string]AlertHandler - recordHandlers map[string]RecordHandler } // NewEngine returns a new engine. func NewEngine(storage local.Storage) *Engine { ctx, cancel := context.WithCancel(context.Background()) return &Engine{ - storage: storage, - baseCtx: ctx, - cancelQueries: cancel, - alertHandlers: map[string]AlertHandler{}, - recordHandlers: map[string]RecordHandler{}, + storage: storage, + baseCtx: ctx, + cancelQueries: cancel, } } @@ -292,31 +266,6 @@ func (ng *Engine) Stop() { ng.cancelQueries() } -// NewQuery returns a new query of the given query string. -func (ng *Engine) NewQuery(qs string) (Query, error) { - stmts, err := ParseStmts(qs) - if err != nil { - return nil, err - } - query := &query{ - q: qs, - stmts: stmts, - ng: ng, - done: make(chan bool, 2), - stats: stats.NewTimerGroup(), - } - return query, nil -} - -// NewQueryFromFile reads a file and returns a query of statements it contains. -func (ng *Engine) NewQueryFromFile(filename string) (Query, error) { - content, err := ioutil.ReadFile(filename) - if err != nil { - return nil, err - } - return ng.NewQuery(string(content)) -} - // NewInstantQuery returns an evaluation query for the given expression at the given time. func (ng *Engine) NewInstantQuery(es string, ts clientmodel.Timestamp) (Query, error) { return ng.NewRangeQuery(es, ts, ts, 0) @@ -336,77 +285,64 @@ func (ng *Engine) NewRangeQuery(qs string, start, end clientmodel.Timestamp, int Interval: interval, } - query := &query{ + qry := &query{ q: qs, stmts: Statements{es}, ng: ng, - done: make(chan bool, 2), stats: stats.NewTimerGroup(), } - return query, nil + return qry, nil } -// exec executes all statements in the query. For evaluation statements only -// one statement per query is allowed, after which the execution returns. +// testStmt is an internal helper statement that allows execution +// of an arbitrary function during handling. It is used to test the Engine. +type testStmt func(context.Context) error + +func (testStmt) String() string { return "test statement" } +func (testStmt) DotGraph() string { return "test statement" } +func (testStmt) stmt() {} + +func (ng *Engine) newTestQuery(stmts ...Statement) Query { + qry := &query{ + q: "test statement", + stmts: Statements(stmts), + ng: ng, + stats: stats.NewTimerGroup(), + } + return qry +} + +// exec executes the query. +// +// At this point per query only one EvalStmt is evaluated. Alert and record +// statements are not handled by the Engine. func (ng *Engine) exec(ctx context.Context, q *query) (Value, error) { const env = "query execution" // Cancel when execution is done or an error was raised. defer q.cancel() - // The base context might already be canceled (e.g. during shutdown). - if err := contextDone(ctx, env); err != nil { - return nil, err - } - evalTimer := q.stats.GetTimer(stats.TotalEvalTime).Start() defer evalTimer.Stop() - ng.RLock() - alertHandlers := []AlertHandler{} - for _, h := range ng.alertHandlers { - alertHandlers = append(alertHandlers, h) - } - recordHandlers := []RecordHandler{} - for _, h := range ng.recordHandlers { - recordHandlers = append(recordHandlers, h) - } - ng.RUnlock() - for _, stmt := range q.stmts { + // The base context might already be canceled on the first iteration (e.g. during shutdown). + if err := contextDone(ctx, env); err != nil { + return nil, err + } + switch s := stmt.(type) { - case *AlertStmt: - if len(alertHandlers) == 0 { - return nil, ErrNoHandlers("alert statement") - } - for _, h := range alertHandlers { - if err := contextDone(ctx, env); err != nil { - return nil, err - } - err := h(ctx, s) - if err != nil { - return nil, err - } - } - case *RecordStmt: - if len(recordHandlers) == 0 { - return nil, ErrNoHandlers("record statement") - } - for _, h := range recordHandlers { - if err := contextDone(ctx, env); err != nil { - return nil, err - } - err := h(ctx, s) - if err != nil { - return nil, err - } - } case *EvalStmt: // Currently, only one execution statement per query is allowed. return ng.execEvalStmt(ctx, q, s) + case testStmt: + if err := s(ctx); err != nil { + return nil, err + } + default: - panic(fmt.Errorf("statement of unknown type %T", stmt)) + panic(fmt.Errorf("promql.Engine.exec: unhandled statement of type %T", stmt)) } } return nil, nil @@ -1050,34 +986,6 @@ func (ev *evaluator) aggregation(op itemType, grouping clientmodel.LabelNames, k return resultVector } -// RegisterAlertHandler registers a new alert handler of the given name. -func (ng *Engine) RegisterAlertHandler(name string, h AlertHandler) { - ng.Lock() - ng.alertHandlers[name] = h - ng.Unlock() -} - -// RegisterRecordHandler registers a new record handler of the given name. -func (ng *Engine) RegisterRecordHandler(name string, h RecordHandler) { - ng.Lock() - ng.recordHandlers[name] = h - ng.Unlock() -} - -// UnregisterAlertHandler removes the alert handler with the given name. -func (ng *Engine) UnregisterAlertHandler(name string) { - ng.Lock() - delete(ng.alertHandlers, name) - ng.Unlock() -} - -// UnregisterRecordHandler removes the record handler with the given name. -func (ng *Engine) UnregisterRecordHandler(name string) { - ng.Lock() - delete(ng.recordHandlers, name) - ng.Unlock() -} - // btos returns 1 if b is true, 0 otherwise. func btos(b bool) clientmodel.SampleValue { if b { diff --git a/promql/engine_test.go b/promql/engine_test.go index 35e9188af..09b175385 100644 --- a/promql/engine_test.go +++ b/promql/engine_test.go @@ -1,7 +1,6 @@ package promql import ( - "reflect" "sync" "testing" "time" @@ -11,6 +10,10 @@ import ( "github.com/prometheus/prometheus/storage/local" ) +var noop = testStmt(func(context.Context) error { + return nil +}) + func TestQueryTimeout(t *testing.T) { *defaultQueryTimeout = 5 * time.Millisecond defer func() { @@ -24,23 +27,14 @@ func TestQueryTimeout(t *testing.T) { engine := NewEngine(storage) defer engine.Stop() - query, err := engine.NewQuery("foo = bar") - if err != nil { - t.Fatalf("error parsing query: %s", err) - } + f1 := testStmt(func(context.Context) error { + time.Sleep(10 * time.Millisecond) + return nil + }) // Timeouts are not exact but checked in designated places. For example between - // invoking handlers. Thus, we reigster two handlers that take some time to ensure we check - // after exceeding the timeout. - // Should the implementation of this area change, the test might have to be adjusted. - engine.RegisterRecordHandler("test", func(context.Context, *RecordStmt) error { - time.Sleep(10 * time.Millisecond) - return nil - }) - engine.RegisterRecordHandler("test2", func(context.Context, *RecordStmt) error { - time.Sleep(10 * time.Millisecond) - return nil - }) + // invoking test statements. + query := engine.newTestQuery(f1, f1) res := query.Exec() if res.Err == nil { @@ -58,26 +52,16 @@ func TestQueryCancel(t *testing.T) { engine := NewEngine(storage) defer engine.Stop() - query1, err := engine.NewQuery("foo = bar") - if err != nil { - t.Fatalf("error parsing query: %s", err) - } - query2, err := engine.NewQuery("foo = baz") - if err != nil { - t.Fatalf("error parsing query: %s", err) - } - // As for timeouts, cancellation is only checked at designated points. We ensure // that we reach one of those points using the same method. - engine.RegisterRecordHandler("test1", func(context.Context, *RecordStmt) error { - <-time.After(2 * time.Millisecond) - return nil - }) - engine.RegisterRecordHandler("test2", func(context.Context, *RecordStmt) error { - <-time.After(2 * time.Millisecond) + f1 := testStmt(func(context.Context) error { + time.Sleep(2 * time.Millisecond) return nil }) + query1 := engine.newTestQuery(f1, f1) + query2 := engine.newTestQuery(f1, f1) + // Cancel query after starting it. var wg sync.WaitGroup var res *Result @@ -87,7 +71,7 @@ func TestQueryCancel(t *testing.T) { res = query1.Exec() wg.Done() }() - <-time.After(1 * time.Millisecond) + time.Sleep(1 * time.Millisecond) query1.Cancel() wg.Wait() @@ -112,34 +96,20 @@ func TestEngineShutdown(t *testing.T) { engine := NewEngine(storage) - query1, err := engine.NewQuery("foo = bar") - if err != nil { - t.Fatalf("error parsing query: %s", err) - } - query2, err := engine.NewQuery("foo = baz") - if err != nil { - t.Fatalf("error parsing query: %s", err) - } - handlerExecutions := 0 - // Shutdown engine on first handler execution. Should handler execution ever become // concurrent this test has to be adjusted accordingly. - engine.RegisterRecordHandler("test", func(context.Context, *RecordStmt) error { - handlerExecutions++ - engine.Stop() - time.Sleep(10 * time.Millisecond) - return nil - }) - engine.RegisterRecordHandler("test2", func(context.Context, *RecordStmt) error { + f1 := testStmt(func(context.Context) error { handlerExecutions++ engine.Stop() time.Sleep(10 * time.Millisecond) return nil }) + query1 := engine.newTestQuery(f1, f1) + query2 := engine.newTestQuery(f1, f1) - // Stopping the engine should cancel the base context. While setting up queries is - // still possible their context is canceled from the beginning and execution should + // Stopping the engine must cancel the base context. While executing queries is + // still possible, their context is canceled from the beginning and execution should // terminate immediately. res := query1.Exec() @@ -147,7 +117,7 @@ func TestEngineShutdown(t *testing.T) { t.Fatalf("expected error on shutdown during query but got none") } if handlerExecutions != 1 { - t.Fatalf("expected only one handler to be executed before query cancellation but got %d executons", handlerExecutions) + t.Fatalf("expected only one handler to be executed before query cancellation but got %d executions", handlerExecutions) } res2 := query2.Exec() @@ -159,114 +129,3 @@ func TestEngineShutdown(t *testing.T) { } } - -func TestAlertHandler(t *testing.T) { - storage, closer := local.NewTestStorage(t, 1) - defer closer.Close() - - engine := NewEngine(storage) - defer engine.Stop() - - qs := `ALERT Foo IF bar FOR 5m WITH {a="b"} SUMMARY "sum" DESCRIPTION "desc"` - - doQuery := func(expectFailure bool) *AlertStmt { - query, err := engine.NewQuery(qs) - if err != nil { - t.Fatalf("error parsing query: %s", err) - } - res := query.Exec() - if expectFailure && res.Err == nil { - t.Fatalf("expected error but got none.") - } - if res.Err != nil && !expectFailure { - t.Fatalf("error on executing alert query: %s", res.Err) - } - // That this alert statement is correct is tested elsewhere. - return query.Statements()[0].(*AlertStmt) - } - - // We expect an error if nothing is registered to handle the query. - alertStmt := doQuery(true) - - receivedCalls := 0 - - // Ensure that we receive the correct statement. - engine.RegisterAlertHandler("test", func(ctx context.Context, as *AlertStmt) error { - if !reflect.DeepEqual(alertStmt, as) { - t.Errorf("received alert statement did not match input: %q", qs) - t.Fatalf("no match\n\nexpected:\n%s\ngot: \n%s\n", Tree(alertStmt), Tree(as)) - } - receivedCalls++ - return nil - }) - - for i := 0; i < 10; i++ { - doQuery(false) - if receivedCalls != i+1 { - t.Fatalf("alert handler was not called on query execution") - } - } - - engine.UnregisterAlertHandler("test") - - // We must receive no further calls after unregistering. - doQuery(true) - if receivedCalls != 10 { - t.Fatalf("received calls after unregistering alert handler") - } -} - -func TestRecordHandler(t *testing.T) { - storage, closer := local.NewTestStorage(t, 1) - defer closer.Close() - - engine := NewEngine(storage) - defer engine.Stop() - - qs := `foo = bar` - - doQuery := func(expectFailure bool) *RecordStmt { - query, err := engine.NewQuery(qs) - if err != nil { - t.Fatalf("error parsing query: %s", err) - } - res := query.Exec() - if expectFailure && res.Err == nil { - t.Fatalf("expected error but got none.") - } - if res.Err != nil && !expectFailure { - t.Fatalf("error on executing record query: %s", res.Err) - } - return query.Statements()[0].(*RecordStmt) - } - - // We expect an error if nothing is registered to handle the query. - recordStmt := doQuery(true) - - receivedCalls := 0 - - // Ensure that we receive the correct statement. - engine.RegisterRecordHandler("test", func(ctx context.Context, rs *RecordStmt) error { - if !reflect.DeepEqual(recordStmt, rs) { - t.Errorf("received record statement did not match input: %q", qs) - t.Fatalf("no match\n\nexpected:\n%s\ngot: \n%s\n", Tree(recordStmt), Tree(rs)) - } - receivedCalls++ - return nil - }) - - for i := 0; i < 10; i++ { - doQuery(false) - if receivedCalls != i+1 { - t.Fatalf("record handler was not called on query execution") - } - } - - engine.UnregisterRecordHandler("test") - - // We must receive no further calls after unregistering. - doQuery(true) - if receivedCalls != 10 { - t.Fatalf("received calls after unregistering record handler") - } -} diff --git a/rules/manager.go b/rules/manager.go index 3b2bed6ee..fe6b12b00 100644 --- a/rules/manager.go +++ b/rules/manager.go @@ -15,12 +15,12 @@ package rules import ( "fmt" + "io/ioutil" "sync" "time" "github.com/golang/glog" "github.com/prometheus/client_golang/prometheus" - "golang.org/x/net/context" clientmodel "github.com/prometheus/client_golang/model" @@ -113,9 +113,6 @@ func NewManager(o *ManagerOptions) *Manager { notificationHandler: o.NotificationHandler, prometheusURL: o.PrometheusURL, } - manager.queryEngine.RegisterAlertHandler("rule_manager", manager.AddAlertingRule) - manager.queryEngine.RegisterRecordHandler("rule_manager", manager.AddRecordingRule) - return manager } @@ -258,24 +255,37 @@ func (m *Manager) runIteration() { wg.Wait() } -func (m *Manager) AddAlertingRule(ctx context.Context, r *promql.AlertStmt) error { - rule := NewAlertingRule(r.Name, r.Expr, r.Duration, r.Labels, r.Summary, r.Description) - +// LoadRuleFiles loads alerting and recording rules from the given files. +func (m *Manager) LoadRuleFiles(filenames ...string) error { m.Lock() - m.rules = append(m.rules, rule) - m.Unlock() - return nil -} - -func (m *Manager) AddRecordingRule(ctx context.Context, r *promql.RecordStmt) error { - rule := &RecordingRule{r.Name, r.Expr, r.Labels} - - m.Lock() - m.rules = append(m.rules, rule) - m.Unlock() + defer m.Unlock() + + for _, fn := range filenames { + content, err := ioutil.ReadFile(fn) + if err != nil { + return err + } + stmts, err := promql.ParseStmts(string(content)) + if err != nil { + return fmt.Errorf("error parsing %s: %s", fn, err) + } + for _, stmt := range stmts { + switch r := stmt.(type) { + case *promql.AlertStmt: + rule := NewAlertingRule(r.Name, r.Expr, r.Duration, r.Labels, r.Summary, r.Description) + m.rules = append(m.rules, rule) + case *promql.RecordStmt: + rule := &RecordingRule{r.Name, r.Expr, r.Labels} + m.rules = append(m.rules, rule) + default: + panic("retrieval.Manager.LoadRuleFiles: unknown statement type") + } + } + } return nil } +// Rules returns the list of the manager's rules. func (m *Manager) Rules() []Rule { m.Lock() defer m.Unlock() @@ -285,6 +295,7 @@ func (m *Manager) Rules() []Rule { return rules } +// AlertingRules returns the list of the manager's alerting rules. func (m *Manager) AlertingRules() []*AlertingRule { m.Lock() defer m.Unlock()