@ -225,6 +225,367 @@ func TestAgent_Services_ACLFilter(t *testing.T) {
} )
}
func TestAgent_Service ( t * testing . T ) {
t . Parallel ( )
a := NewTestAgent ( t . Name ( ) , TestACLConfig ( ) + `
services {
name = "web"
port = 8181
}
` )
defer a . Shutdown ( )
testrpc . WaitForLeader ( t , a . RPC , "dc1" )
proxy := structs . TestConnectProxyConfig ( t )
proxy . DestinationServiceID = "web1"
// Define a valid local sidecar proxy service
sidecarProxy := & structs . ServiceDefinition {
Kind : structs . ServiceKindConnectProxy ,
Name : "web-sidecar-proxy" ,
Check : structs . CheckType {
TCP : "127.0.0.1:8000" ,
Interval : 10 * time . Second ,
} ,
Port : 8000 ,
Proxy : & proxy ,
}
// Define an updated version. Be careful to copy it.
updatedProxy := * sidecarProxy
updatedProxy . Port = 9999
// Mangle the proxy config/upstreams into the expected for with defaults and
// API struct types.
expectProxy := proxy
expectProxy . Upstreams =
structs . TestAddDefaultsToUpstreams ( t , sidecarProxy . Proxy . Upstreams )
expectedResponse := & api . AgentService {
Kind : api . ServiceKindConnectProxy ,
ID : "web-sidecar-proxy" ,
Service : "web-sidecar-proxy" ,
Port : 8000 ,
Proxy : expectProxy . ToAPI ( ) ,
ContentHash : "26959a754e182054" ,
}
// Copy and modify
updatedResponse := * expectedResponse
updatedResponse . Port = 9999
updatedResponse . ContentHash = "1bdcf042660b33f6"
// Simple response for non-proxy service regustered in TestAgent config
expectWebResponse := & api . AgentService {
ID : "web" ,
Service : "web" ,
Port : 8181 ,
ContentHash : "7be2b0411161d3b1" ,
}
tests := [ ] struct {
name string
tokenRules string
url string
updateFunc func ( )
wantWait time . Duration
wantCode int
wantErr string
wantResp * api . AgentService
} {
{
name : "simple fetch - proxy" ,
url : "/v1/agent/service/web-sidecar-proxy" ,
wantCode : 200 ,
wantResp : expectedResponse ,
} ,
{
name : "simple fetch - non-proxy" ,
url : "/v1/agent/service/web" ,
wantCode : 200 ,
wantResp : expectWebResponse ,
} ,
{
name : "blocking fetch timeout, no change" ,
url : "/v1/agent/service/web-sidecar-proxy?hash=" + expectedResponse . ContentHash + "&wait=100ms" ,
wantWait : 100 * time . Millisecond ,
wantCode : 200 ,
wantResp : expectedResponse ,
} ,
{
name : "blocking fetch old hash should return immediately" ,
url : "/v1/agent/service/web-sidecar-proxy?hash=123456789abcd&wait=10m" ,
wantCode : 200 ,
wantResp : expectedResponse ,
} ,
{
name : "blocking fetch returns change" ,
url : "/v1/agent/service/web-sidecar-proxy?hash=" + expectedResponse . ContentHash ,
updateFunc : func ( ) {
time . Sleep ( 100 * time . Millisecond )
// Re-register with new proxy config, make sure we copy the struct so we
// don't alter it and affect later test cases.
req , _ := http . NewRequest ( "PUT" , "/v1/agent/service/register?token=root" , jsonReader ( updatedProxy ) )
resp := httptest . NewRecorder ( )
_ , err := a . srv . AgentRegisterService ( resp , req )
require . NoError ( t , err )
require . Equal ( t , 200 , resp . Code , "body: %s" , resp . Body . String ( ) )
} ,
wantWait : 100 * time . Millisecond ,
wantCode : 200 ,
wantResp : & updatedResponse ,
} ,
{
// This test exercises a case that caused a busy loop to eat CPU for the
// entire duration of the blocking query. If a service gets re-registered
// wth same proxy config then the old proxy config chan is closed causing
// blocked watchset.Watch to return false indicating a change. But since
// the hash is the same when the blocking fn is re-called we should just
// keep blocking on the next iteration. The bug hit was that the WatchSet
// ws was not being reset in the loop and so when you try to `Watch` it
// the second time it just returns immediately making the blocking loop
// into a busy-poll!
//
// This test though doesn't catch that because busy poll still has the
// correct external behaviour. I don't want to instrument the loop to
// assert it's not executing too fast here as I can't think of a clean way
// and the issue is fixed now so this test doesn't actually catch the
// error, but does provide an easy way to verify the behaviour by hand:
// 1. Make this test fail e.g. change wantErr to true
// 2. Add a log.Println or similar into the blocking loop/function
// 3. See whether it's called just once or many times in a tight loop.
name : "blocking fetch interrupted with no change (same hash)" ,
url : "/v1/agent/service/web-sidecar-proxy?wait=200ms&hash=" + expectedResponse . ContentHash ,
updateFunc : func ( ) {
time . Sleep ( 100 * time . Millisecond )
// Re-register with _same_ proxy config
req , _ := http . NewRequest ( "PUT" , "/v1/agent/service/register?token=root" , jsonReader ( sidecarProxy ) )
resp := httptest . NewRecorder ( )
_ , err := a . srv . AgentRegisterService ( resp , req )
require . NoError ( t , err )
require . Equal ( t , 200 , resp . Code , "body: %s" , resp . Body . String ( ) )
} ,
wantWait : 200 * time . Millisecond ,
wantCode : 200 ,
wantResp : expectedResponse ,
} ,
{
// When we reload config, the agent pauses Anti-entropy, then clears all
// services (which causes their watch chans to be closed) before loading
// state from config/snapshot again). If we do that naively then we don't
// just get a spurios wakeup on the watch if the service didn't change,
// but we get it wakeup and then race with the reload and probably see no
// services and return a 404 error which is gross. This test excercises
// that - even though the registrations were from API not config, they are
// persisted and cleared/reloaded from snapshot which has same effect.
//
// The fix for this test is to allow the same mechanism that pauses
// Anti-entropy during reload to also pause the hash blocking loop so we
// don't resume until the state is reloaded and we get a chance to see if
// it actually changed or not.
name : "blocking fetch interrupted by reload shouldn't 404 - no change" ,
url : "/v1/agent/service/web-sidecar-proxy?wait=200ms&hash=" + expectedResponse . ContentHash ,
updateFunc : func ( ) {
time . Sleep ( 100 * time . Millisecond )
// Reload
require . NoError ( t , a . ReloadConfig ( a . Config ) )
} ,
// Should eventually timeout since there is no actual change
wantWait : 200 * time . Millisecond ,
wantCode : 200 ,
wantResp : expectedResponse ,
} ,
{
// As above but test actually altering the service with the config reload.
// This simulates the API registration being overridden by a different one
// on disk during reload.
name : "blocking fetch interrupted by reload shouldn't 404 - changes" ,
url : "/v1/agent/service/web-sidecar-proxy?wait=10m&hash=" + expectedResponse . ContentHash ,
updateFunc : func ( ) {
time . Sleep ( 100 * time . Millisecond )
// Reload
newConfig := * a . Config
newConfig . Services = append ( newConfig . Services , & updatedProxy )
require . NoError ( t , a . ReloadConfig ( & newConfig ) )
} ,
wantWait : 100 * time . Millisecond ,
wantCode : 200 ,
wantResp : & updatedResponse ,
} ,
{
name : "err: non-existent proxy" ,
url : "/v1/agent/service/nope" ,
wantCode : 404 ,
} ,
{
name : "err: bad ACL for service" ,
url : "/v1/agent/service/web-sidecar-proxy" ,
// Limited token doesn't grant read to the service
tokenRules : `
key "" {
policy = "read"
}
` ,
// Note that because we return ErrPermissionDenied and handle writing
// status at a higher level helper this actually gets a 200 in this test
// case so just assert that it was an error.
wantErr : "Permission denied" ,
} ,
{
name : "good ACL for service" ,
url : "/v1/agent/service/web-sidecar-proxy" ,
// Limited token doesn't grant read to the service
tokenRules : `
service "web-sidecar-proxy" {
policy = "read"
}
` ,
wantCode : 200 ,
wantResp : expectedResponse ,
} ,
}
for _ , tt := range tests {
t . Run ( tt . name , func ( t * testing . T ) {
assert := assert . New ( t )
require := require . New ( t )
// Register the basic service to ensure it's in a known state to start.
{
req , _ := http . NewRequest ( "PUT" , "/v1/agent/service/register?token=root" , jsonReader ( sidecarProxy ) )
resp := httptest . NewRecorder ( )
_ , err := a . srv . AgentRegisterService ( resp , req )
require . NoError ( err )
require . Equal ( 200 , resp . Code , "body: %s" , resp . Body . String ( ) )
}
req , _ := http . NewRequest ( "GET" , tt . url , nil )
// Inject the root token for tests that don't care about ACL
var token = "root"
if tt . tokenRules != "" {
// Create new token and use that.
token = testCreateToken ( t , a , tt . tokenRules )
}
req . Header . Set ( "X-Consul-Token" , token )
resp := httptest . NewRecorder ( )
if tt . updateFunc != nil {
go tt . updateFunc ( )
}
start := time . Now ( )
obj , err := a . srv . AgentService ( resp , req )
elapsed := time . Now ( ) . Sub ( start )
if tt . wantErr != "" {
require . Error ( err )
require . Contains ( strings . ToLower ( err . Error ( ) ) , strings . ToLower ( tt . wantErr ) )
} else {
require . NoError ( err )
}
if tt . wantCode != 0 {
require . Equal ( tt . wantCode , resp . Code , "body: %s" , resp . Body . String ( ) )
}
if tt . wantWait != 0 {
assert . True ( elapsed >= tt . wantWait , "should have waited at least %s, " +
"took %s" , tt . wantWait , elapsed )
} else {
assert . True ( elapsed < 10 * time . Millisecond , "should not have waited, " +
"took %s" , elapsed )
}
if tt . wantResp != nil {
assert . Equal ( tt . wantResp , obj )
assert . Equal ( tt . wantResp . ContentHash , resp . Header ( ) . Get ( "X-Consul-ContentHash" ) )
} else {
// Janky but Equal doesn't help here because nil !=
// *api.AgentService((*api.AgentService)(nil))
assert . Nil ( obj )
}
} )
}
}
// DEPRECATED(managed-proxies) - remove this In the interim, we need the newer
// /agent/service/service to work for managed proxies so we can swithc the built
// in proxy to use only that without breaking managed proxies early.
func TestAgent_Service_DeprecatedManagedProxy ( t * testing . T ) {
t . Parallel ( )
a := NewTestAgent ( t . Name ( ) , `
connect {
proxy {
allow_managed_api_registration = true
}
}
` )
defer a . Shutdown ( )
testrpc . WaitForLeader ( t , a . RPC , "dc1" )
svc := & structs . ServiceDefinition {
Name : "web" ,
Port : 8000 ,
Check : structs . CheckType {
TTL : 10 * time . Second ,
} ,
Connect : & structs . ServiceConnect {
Proxy : & structs . ServiceDefinitionConnectProxy {
// Fix the command otherwise the executable path ends up being random
// temp dir in every test run so the ContentHash will never match.
Command : [ ] string { "foo" } ,
Config : map [ string ] interface { } {
"foo" : "bar" ,
"bind_address" : "10.10.10.10" ,
"bind_port" : 9999 , // make this deterministic
} ,
Upstreams : structs . TestUpstreams ( t ) ,
} ,
} ,
}
require := require . New ( t )
rr := httptest . NewRecorder ( )
req , _ := http . NewRequest ( "POST" , "/v1/agent/services/register" , jsonReader ( svc ) )
_ , err := a . srv . AgentRegisterService ( rr , req )
require . NoError ( err )
require . Equal ( 200 , rr . Code , "body:\n" + rr . Body . String ( ) )
rr = httptest . NewRecorder ( )
req , _ = http . NewRequest ( "GET" , "/v1/agent/service/web-proxy" , nil )
obj , err := a . srv . AgentService ( rr , req )
require . NoError ( err )
require . Equal ( 200 , rr . Code , "body:\n" + rr . Body . String ( ) )
gotService , ok := obj . ( * api . AgentService )
require . True ( ok )
expect := & api . AgentService {
Kind : api . ServiceKindConnectProxy ,
ID : "web-proxy" ,
Service : "web-proxy" ,
Port : 9999 ,
Address : "10.10.10.10" ,
ContentHash : "e24f099e42e88317" ,
Proxy : & api . AgentServiceConnectProxyConfig {
DestinationServiceID : "web" ,
DestinationServiceName : "web" ,
LocalServiceAddress : "127.0.0.1" ,
LocalServicePort : 8000 ,
Config : map [ string ] interface { } {
"foo" : "bar" ,
"bind_port" : 9999 ,
"bind_address" : "10.10.10.10" ,
"local_service_address" : "127.0.0.1:8000" ,
} ,
Upstreams : structs . TestAddDefaultsToUpstreams ( t , svc . Connect . Proxy . Upstreams ) . ToAPI ( ) ,
} ,
}
require . Equal ( expect , gotService )
}
func TestAgent_Checks ( t * testing . T ) {
t . Parallel ( )
a := NewTestAgent ( t . Name ( ) , "" )
@ -1593,38 +1954,49 @@ func TestAgent_RegisterService_TranslateKeys(t *testing.T) {
} ,
} ,
} ,
SidecarService : & structs . ServiceDefinition {
Name : "test-proxy" ,
Meta : map [ string ] string {
"some" : "meta" ,
"enable_tag_override" : "sidecar_service.meta is 'opaque' so should not get translated" ,
} , Port : 8001 ,
EnableTagOverride : true ,
Kind : structs . ServiceKindConnectProxy ,
Proxy : & structs . ConnectProxyConfig {
DestinationServiceName : "test" ,
DestinationServiceID : "test" ,
LocalServiceAddress : "127.0.0.1" ,
LocalServicePort : 4321 ,
Upstreams : structs . Upstreams {
{
DestinationType : structs . UpstreamDestTypeService ,
DestinationName : "db" ,
DestinationNamespace : "default" ,
LocalBindAddress : "127.0.0.1" ,
LocalBindPort : 1234 ,
Config : map [ string ] interface { } {
"destination_type" : "sidecar_service.proxy.upstreams.config is 'opaque' so should not get translated" ,
} ,
} ,
// The sidecar service is nilled since it is only config sugar and
// shouldn't be represented in state. We assert that the translations
// there worked by inspecting the registered sidecar below.
SidecarService : nil ,
} ,
}
got := a . State . Service ( "test" )
require . Equal ( t , svc , got )
sidecarSvc := & structs . NodeService {
Kind : structs . ServiceKindConnectProxy ,
ID : "test-sidecar-proxy" ,
Service : "test-proxy" ,
Meta : map [ string ] string {
"some" : "meta" ,
"enable_tag_override" : "sidecar_service.meta is 'opaque' so should not get translated" ,
} ,
Port : 8001 ,
EnableTagOverride : true ,
LocallyRegisteredAsSidecar : true ,
Proxy : structs . ConnectProxyConfig {
DestinationServiceName : "test" ,
DestinationServiceID : "test" ,
LocalServiceAddress : "127.0.0.1" ,
LocalServicePort : 4321 ,
Upstreams : structs . Upstreams {
{
DestinationType : structs . UpstreamDestTypeService ,
DestinationName : "db" ,
DestinationNamespace : "default" ,
LocalBindAddress : "127.0.0.1" ,
LocalBindPort : 1234 ,
Config : map [ string ] interface { } {
"destination_type" : "sidecar_service.proxy.upstreams.config is 'opaque' so should not get translated" ,
} ,
} ,
} ,
} ,
}
got := a . State . Service ( "test" )
require . Equal ( t , svc , got )
gotSidecar := a . State . Service ( "test-sidecar-proxy " )
require . Equal ( t , sidecarS vc , gotSidecar )
}
func TestAgent_RegisterService_ACLDeny ( t * testing . T ) {
@ -1978,6 +2350,21 @@ func testDefaultSidecar(svc string, port int, fns ...func(*structs.NodeService))
return ns
}
func testCreateToken ( t * testing . T , a * TestAgent , rules string ) string {
args := map [ string ] interface { } {
"Name" : "User Token" ,
"Type" : "client" ,
"Rules" : rules ,
}
req , _ := http . NewRequest ( "PUT" , "/v1/acl/create?token=root" , jsonReader ( args ) )
resp := httptest . NewRecorder ( )
obj , err := a . srv . ACLCreate ( resp , req )
require . NoError ( t , err )
require . NotNil ( t , obj )
aclResp := obj . ( aclCreateResponse )
return aclResp . ID
}
// This tests local agent service registration with a sidecar service. Note we
// only test simple defaults for the sidecar here since the actual logic for
// handling sidecar defaults and port assignment is tested thoroughly in
@ -2361,18 +2748,7 @@ func TestAgent_RegisterServiceDeregisterService_Sidecar(t *testing.T) {
// Create an ACL token with require policy
var token string
if tt . enableACL && tt . tokenRules != "" {
args := map [ string ] interface { } {
"Name" : "User Token" ,
"Type" : "client" ,
"Rules" : tt . tokenRules ,
}
req , _ := http . NewRequest ( "PUT" , "/v1/acl/create?token=root" , jsonReader ( args ) )
resp := httptest . NewRecorder ( )
obj , err := a . srv . ACLCreate ( resp , req )
require . NoError ( err )
require . NotNil ( obj )
aclResp := obj . ( aclCreateResponse )
token = aclResp . ID
token = testCreateToken ( t , a , tt . tokenRules )
}
br := bytes . NewBufferString ( tt . json )