mirror of https://github.com/prometheus/prometheus
You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
212 lines
4.7 KiB
212 lines
4.7 KiB
package promql |
|
|
|
import ( |
|
"fmt" |
|
"testing" |
|
"time" |
|
|
|
"golang.org/x/net/context" |
|
) |
|
|
|
var noop = testStmt(func(context.Context) error { |
|
return nil |
|
}) |
|
|
|
func TestQueryConcurrency(t *testing.T) { |
|
engine := NewEngine(nil, nil) |
|
defer engine.Stop() |
|
|
|
block := make(chan struct{}) |
|
processing := make(chan struct{}) |
|
|
|
f := func(context.Context) error { |
|
processing <- struct{}{} |
|
<-block |
|
return nil |
|
} |
|
|
|
for i := 0; i < DefaultEngineOptions.MaxConcurrentQueries; i++ { |
|
q := engine.newTestQuery(f) |
|
go q.Exec() |
|
select { |
|
case <-processing: |
|
// Expected. |
|
case <-time.After(20 * time.Millisecond): |
|
t.Fatalf("Query within concurrency threshold not being executed") |
|
} |
|
} |
|
|
|
q := engine.newTestQuery(f) |
|
go q.Exec() |
|
|
|
select { |
|
case <-processing: |
|
t.Fatalf("Query above concurrency threhosld being executed") |
|
case <-time.After(20 * time.Millisecond): |
|
// Expected. |
|
} |
|
|
|
// Terminate a running query. |
|
block <- struct{}{} |
|
|
|
select { |
|
case <-processing: |
|
// Expected. |
|
case <-time.After(20 * time.Millisecond): |
|
t.Fatalf("Query within concurrency threshold not being executed") |
|
} |
|
|
|
// Terminate remaining queries. |
|
for i := 0; i < DefaultEngineOptions.MaxConcurrentQueries; i++ { |
|
block <- struct{}{} |
|
} |
|
} |
|
|
|
func TestQueryTimeout(t *testing.T) { |
|
engine := NewEngine(nil, &EngineOptions{ |
|
Timeout: 5 * time.Millisecond, |
|
MaxConcurrentQueries: 20, |
|
}) |
|
defer engine.Stop() |
|
|
|
query := engine.newTestQuery(func(ctx context.Context) error { |
|
time.Sleep(50 * time.Millisecond) |
|
return contextDone(ctx, "test statement execution") |
|
}) |
|
|
|
res := query.Exec() |
|
if res.Err == nil { |
|
t.Fatalf("expected timeout error but got none") |
|
} |
|
if _, ok := res.Err.(ErrQueryTimeout); res.Err != nil && !ok { |
|
t.Fatalf("expected timeout error but got: %s", res.Err) |
|
} |
|
} |
|
|
|
func TestQueryCancel(t *testing.T) { |
|
engine := NewEngine(nil, nil) |
|
defer engine.Stop() |
|
|
|
// Cancel a running query before it completes. |
|
block := make(chan struct{}) |
|
processing := make(chan struct{}) |
|
|
|
query1 := engine.newTestQuery(func(ctx context.Context) error { |
|
processing <- struct{}{} |
|
<-block |
|
return contextDone(ctx, "test statement execution") |
|
}) |
|
|
|
var res *Result |
|
|
|
go func() { |
|
res = query1.Exec() |
|
processing <- struct{}{} |
|
}() |
|
|
|
<-processing |
|
query1.Cancel() |
|
block <- struct{}{} |
|
<-processing |
|
|
|
if res.Err == nil { |
|
t.Fatalf("expected cancellation error for query1 but got none") |
|
} |
|
if ee := ErrQueryCanceled("test statement execution"); res.Err != ee { |
|
t.Fatalf("expected error %q, got %q", ee, res.Err) |
|
} |
|
|
|
// Canceling a query before starting it must have no effect. |
|
query2 := engine.newTestQuery(func(ctx context.Context) error { |
|
return contextDone(ctx, "test statement execution") |
|
}) |
|
|
|
query2.Cancel() |
|
res = query2.Exec() |
|
if res.Err != nil { |
|
t.Fatalf("unexpeceted error on executing query2: %s", res.Err) |
|
} |
|
} |
|
|
|
func TestEngineShutdown(t *testing.T) { |
|
engine := NewEngine(nil, nil) |
|
|
|
block := make(chan struct{}) |
|
processing := make(chan struct{}) |
|
|
|
// Shutdown engine on first handler execution. Should handler execution ever become |
|
// concurrent this test has to be adjusted accordingly. |
|
f := func(ctx context.Context) error { |
|
processing <- struct{}{} |
|
<-block |
|
return contextDone(ctx, "test statement execution") |
|
} |
|
query1 := engine.newTestQuery(f) |
|
|
|
// Stopping the engine must cancel the base context. While executing queries is |
|
// still possible, their context is canceled from the beginning and execution should |
|
// terminate immediately. |
|
|
|
var res *Result |
|
go func() { |
|
res = query1.Exec() |
|
processing <- struct{}{} |
|
}() |
|
|
|
<-processing |
|
engine.Stop() |
|
block <- struct{}{} |
|
<-processing |
|
|
|
if res.Err == nil { |
|
t.Fatalf("expected error on shutdown during query but got none") |
|
} |
|
if ee := ErrQueryCanceled("test statement execution"); res.Err != ee { |
|
t.Fatalf("expected error %q, got %q", ee, res.Err) |
|
} |
|
|
|
query2 := engine.newTestQuery(func(context.Context) error { |
|
t.Fatalf("reached query execution unexpectedly") |
|
return nil |
|
}) |
|
|
|
// The second query is started after the engine shut down. It must |
|
// be canceled immediately. |
|
res2 := query2.Exec() |
|
if res2.Err == nil { |
|
t.Fatalf("expected error on querying shutdown engine but got none") |
|
} |
|
if _, ok := res2.Err.(ErrQueryCanceled); !ok { |
|
t.Fatalf("expected cancelation error, got %q", res2.Err) |
|
} |
|
} |
|
|
|
func TestRecoverEvaluatorRuntime(t *testing.T) { |
|
var ev *evaluator |
|
var err error |
|
defer ev.recover(&err) |
|
|
|
// Cause a runtime panic. |
|
var a []int |
|
a[123] = 1 |
|
|
|
if err.Error() != "unexpected error" { |
|
t.Fatalf("wrong error message: %q, expected %q", err, "unexpected error") |
|
} |
|
} |
|
|
|
func TestRecoverEvaluatorError(t *testing.T) { |
|
var ev *evaluator |
|
var err error |
|
|
|
e := fmt.Errorf("custom error") |
|
|
|
defer func() { |
|
if err.Error() != e.Error() { |
|
t.Fatalf("wrong error message: %q, expected %q", err, e) |
|
} |
|
}() |
|
defer ev.recover(&err) |
|
|
|
panic(e) |
|
}
|
|
|