@ -292,70 +292,74 @@ func TestCatalogNodes_Blocking(t *testing.T) {
t . Parallel ( )
a := NewTestAgent ( t , t . Name ( ) , "" )
defer a . Shutdown ( )
testrpc . WaitForTestAgent ( t , a . RPC , "dc1" )
testrpc . WaitForTestAgent ( t , a . RPC , "dc1" , testrpc . WaitForAntiEntropySync ( ) )
// Register node
// Run the query
args := & structs . DCSpecificRequest {
Datacenter : "dc1" ,
}
var out structs . IndexedNodes
if err := a . RPC ( "Catalog.ListNodes" , * args , & out ) ; err != nil {
t . Fatalf ( "err: %v" , err )
}
// t.Fatal must be called from the main go routine
// of the test. Because of this we cannot call
// t.Fatal from within the go routines and use
// an error channel instead.
errch := make ( chan error , 2 )
// Async cause a change
waitIndex := out . Index
start := time . Now ( )
go func ( ) {
testrpc . WaitForTestAgent ( t , a . RPC , "dc1" )
start := time . Now ( )
// register a service after the blocking call
// in order to unblock it.
time . AfterFunc ( 100 * time . Millisecond , func ( ) {
args := & structs . RegisterRequest {
Datacenter : "dc1" ,
Node : "foo" ,
Address : "127.0.0.1" ,
}
var out struct { }
errch <- a . RPC ( "Catalog.Register" , args , & out )
} )
// now block
req , _ := http . NewRequest ( "GET" , fmt . Sprintf ( "/v1/catalog/nodes?wait=3s&index=%d" , out . Index + 1 ) , nil )
resp := httptest . NewRecorder ( )
obj , err := a . srv . CatalogNodes ( resp , req )
if err != nil {
errch <- err
time . Sleep ( 100 * time . Millisecond )
args := & structs . RegisterRequest {
Datacenter : "dc1" ,
Node : "foo" ,
Address : "127.0.0.1" ,
}
// Should block for a while
if d := time . Since ( start ) ; d < 50 * time . Millisecond {
errch <- fmt . Errorf ( "too fast: %v" , d )
var out struct { }
if err := a . RPC ( "Catalog.Register" , args , & out ) ; err != nil {
t . Fatalf ( "err: %v" , err )
}
} ( )
if idx := getIndex ( t , resp ) ; idx <= out . Index {
errch <- fmt . Errorf ( "bad: %v" , idx )
}
const waitDuration = 3 * time . Second
nodes := obj . ( structs . Nodes )
if len ( nodes ) != 2 {
errch <- fmt . Errorf ( "bad: %v" , obj )
// Re-run the query, if errantly woken up with no change, resume blocking.
var elapsed time . Duration
RUN_BLOCKING_QUERY :
req , err := http . NewRequest ( "GET" , fmt . Sprintf ( "/v1/catalog/nodes?wait=%s&index=%d" ,
waitDuration . String ( ) ,
waitIndex ) , nil )
if err != nil {
t . Fatalf ( "err: %v" , err )
}
resp := httptest . NewRecorder ( )
obj , err := a . srv . CatalogNodes ( resp , req )
if err != nil {
t . Fatalf ( "err: %v" , err )
}
elapsed = time . Since ( start )
idx := getIndex ( t , resp )
if idx < waitIndex {
t . Fatalf ( "bad: %v" , idx )
} else if idx == waitIndex {
if elapsed > waitDuration {
// This should prevent the loop from running longer than the
// waitDuration
t . Fatalf ( "too slow: %v" , elapsed )
}
errch <- nil
} ( )
goto RUN_BLOCKING_QUERY
}
// wait for both go routines to return
if err := <- errch ; err != nil {
t . Fatal ( err )
// Should block at least 100ms before getting the changed results
if elapsed < 100 * time . Millisecond {
t . Fatalf ( "too fast: %v" , elapsed )
}
if err := <- errch ; err != nil {
t . Fatal ( err )
nodes := obj . ( structs . Nodes )
if len ( nodes ) != 2 {
t . Fatalf ( "bad: %v" , obj )
}
}
func TestCatalogNodes_DistanceSort ( t * testing . T ) {