package proxycfg
import (
"log"
"os"
"testing"
"time"
"github.com/stretchr/testify/require"
"github.com/hashicorp/consul/agent/cache"
cachetype "github.com/hashicorp/consul/agent/cache-types"
"github.com/hashicorp/consul/agent/local"
"github.com/hashicorp/consul/agent/structs"
"github.com/hashicorp/consul/agent/token"
)
// assertLastReqArgs verifies that each request type had the correct source
// parameters (e.g. Datacenter name) and token.
func assertLastReqArgs ( t * testing . T , types * TestCacheTypes , token string , source * structs . QuerySource ) {
t . Helper ( )
// Roots needs correct DC and token
rootReq := types . roots . lastReq . Load ( )
require . IsType ( t , rootReq , & structs . DCSpecificRequest { } )
require . Equal ( t , token , rootReq . ( * structs . DCSpecificRequest ) . Token )
require . Equal ( t , source . Datacenter , rootReq . ( * structs . DCSpecificRequest ) . Datacenter )
// Leaf needs correct DC and token
leafReq := types . leaf . lastReq . Load ( )
require . IsType ( t , leafReq , & cachetype . ConnectCALeafRequest { } )
require . Equal ( t , token , leafReq . ( * cachetype . ConnectCALeafRequest ) . Token )
require . Equal ( t , source . Datacenter , leafReq . ( * cachetype . ConnectCALeafRequest ) . Datacenter )
// Intentions needs correct DC and token
intReq := types . intentions . lastReq . Load ( )
require . IsType ( t , intReq , & structs . IntentionQueryRequest { } )
require . Equal ( t , token , intReq . ( * structs . IntentionQueryRequest ) . Token )
require . Equal ( t , source . Datacenter , intReq . ( * structs . IntentionQueryRequest ) . Datacenter )
}
func TestManager_BasicLifecycle ( t * testing . T ) {
// Use a mocked cache to make life simpler
types := NewTestCacheTypes ( t )
c := TestCacheWithTypes ( t , types )
require := require . New ( t )
roots , leaf := TestCerts ( t )
// Setup initial values
types . roots . value . Store ( roots )
types . leaf . value . Store ( leaf )
types . intentions . value . Store ( TestIntentions ( t ) )
types . health . value . Store (
& structs . IndexedCheckServiceNodes {
Nodes : TestUpstreamNodes ( t ) ,
} )
logger := log . New ( os . Stderr , "" , log . LstdFlags )
state := local . NewState ( local . Config { } , logger , & token . Store { } )
source := & structs . QuerySource {
Node : "node1" ,
Datacenter : "dc1" ,
}
// Stub state syncing
state . TriggerSyncChanges = func ( ) { }
// Create manager
m , err := NewManager ( ManagerConfig { c , state , source , logger } )
require . NoError ( err )
// And run it
go func ( ) {
err := m . Run ( )
require . NoError ( err )
} ( )
// Register a proxy for "web"
webProxy := & structs . NodeService {
Kind : structs . ServiceKindConnectProxy ,
ID : "web-sidecar-proxy" ,
Service : "web-sidecar-proxy" ,
Port : 9999 ,
Proxy : structs . ConnectProxyConfig {
DestinationServiceID : "web" ,
DestinationServiceName : "web" ,
LocalServiceAddress : "127.0.0.1" ,
LocalServicePort : 8080 ,
Config : map [ string ] interface { } {
"foo" : "bar" ,
} ,
Upstreams : structs . TestUpstreams ( t ) ,
} ,
}
// BEFORE we register, we should be able to get a watch channel
wCh , cancel := m . Watch ( webProxy . ID )
defer cancel ( )
// And it should block with nothing sent on it yet
assertWatchChanBlocks ( t , wCh )
require . NoError ( state . AddService ( webProxy , "my-token" ) )
// We should see the initial config delivered but not until after the
// coalesce timeout
expectSnap := & ConfigSnapshot {
Kind : structs . ServiceKindConnectProxy ,
Service : webProxy . Service ,
ProxyID : webProxy . ID ,
Address : webProxy . Address ,
Port : webProxy . Port ,
Proxy : webProxy . Proxy ,
TaggedAddresses : make ( map [ string ] structs . ServiceAddress ) ,
Roots : roots ,
Leaf : leaf ,
UpstreamEndpoints : map [ string ] structs . CheckServiceNodes {
"db" : TestUpstreamNodes ( t ) ,
} ,
Datacenter : "dc1" ,
}
start := time . Now ( )
assertWatchChanRecvs ( t , wCh , expectSnap )
require . True ( time . Since ( start ) >= coalesceTimeout )
assertLastReqArgs ( t , types , "my-token" , source )
// Update NodeConfig
webProxy . Port = 7777
require . NoError ( state . AddService ( webProxy , "my-token" ) )
expectSnap . Port = 7777
assertWatchChanRecvs ( t , wCh , expectSnap )
// Register a second watcher
wCh2 , cancel2 := m . Watch ( webProxy . ID )
defer cancel2 ( )
// New watcher should immediately receive the current state
assertWatchChanRecvs ( t , wCh2 , expectSnap )
// Change token
require . NoError ( state . AddService ( webProxy , "other-token" ) )
assertWatchChanRecvs ( t , wCh , expectSnap )
assertWatchChanRecvs ( t , wCh2 , expectSnap )
// This is actually sort of timing dependent - the cache background fetcher
// will still be fetching with the old token, but we rely on the fact that our
// mock type will have been blocked on those for a while.
assertLastReqArgs ( t , types , "other-token" , source )
// Update roots
newRoots , newLeaf := TestCerts ( t )
newRoots . Roots = append ( newRoots . Roots , roots . Roots ... )
types . roots . Set ( newRoots )
// Expect new roots in snapshot
expectSnap . Roots = newRoots
assertWatchChanRecvs ( t , wCh , expectSnap )
assertWatchChanRecvs ( t , wCh2 , expectSnap )
// Update leaf
types . leaf . Set ( newLeaf )
// Expect new roots in snapshot
expectSnap . Leaf = newLeaf
assertWatchChanRecvs ( t , wCh , expectSnap )
assertWatchChanRecvs ( t , wCh2 , expectSnap )
// Remove the proxy
state . RemoveService ( webProxy . ID )
// Chan should NOT close
assertWatchChanBlocks ( t , wCh )
assertWatchChanBlocks ( t , wCh2 )
// Re-add the proxy with another new port
webProxy . Port = 3333
require . NoError ( state . AddService ( webProxy , "other-token" ) )
// Same watch chan should be notified again
expectSnap . Port = 3333
assertWatchChanRecvs ( t , wCh , expectSnap )
assertWatchChanRecvs ( t , wCh2 , expectSnap )
// Cancel watch
cancel ( )
// Watch chan should be closed
assertWatchChanRecvs ( t , wCh , nil )
// We specifically don't remove the proxy or cancel the second watcher to
// ensure both are cleaned up by close.
require . NoError ( m . Close ( ) )
// Sanity check the state is clean
m . mu . Lock ( )
defer m . mu . Unlock ( )
require . Len ( m . proxies , 0 )
require . Len ( m . watchers , 0 )
}
func assertWatchChanBlocks ( t * testing . T , ch <- chan * ConfigSnapshot ) {
t . Helper ( )
select {
case <- ch :
t . Fatal ( "Should be nothing sent on watch chan yet" )
default :
}
}
func assertWatchChanRecvs ( t * testing . T , ch <- chan * ConfigSnapshot , expect * ConfigSnapshot ) {
t . Helper ( )
select {
case got , ok := <- ch :
require . Equal ( t , expect , got )
if expect == nil {
require . False ( t , ok , "watch chan should be closed" )
}
case <- time . After ( 50 * time . Millisecond + coalesceTimeout ) :
t . Fatal ( "recv timeout" )
}
}
func TestManager_deliverLatest ( t * testing . T ) {
// None of these need to do anything to test this method just be valid
logger := log . New ( os . Stderr , "" , log . LstdFlags )
cfg := ManagerConfig {
Cache : cache . New ( nil ) ,
State : local . NewState ( local . Config { } , logger , & token . Store { } ) ,
Source : & structs . QuerySource {
Node : "node1" ,
Datacenter : "dc1" ,
} ,
Logger : logger ,
}
require := require . New ( t )
m , err := NewManager ( cfg )
require . NoError ( err )
snap1 := & ConfigSnapshot {
ProxyID : "test-proxy" ,
Port : 1111 ,
}
snap2 := & ConfigSnapshot {
ProxyID : "test-proxy" ,
Port : 2222 ,
}
// Put an overall time limit on this test case so we don't have to guard every
// call to ensure the whole test doesn't deadlock.
time . AfterFunc ( 100 * time . Millisecond , func ( ) {
t . Fatal ( "test timed out" )
} )
// test 1 buffered chan
ch1 := make ( chan * ConfigSnapshot , 1 )
// Sending to an unblocked chan should work
m . deliverLatest ( snap1 , ch1 )
// Check it was delivered
require . Equal ( snap1 , <- ch1 )
// Now send both without reading simulating a slow client
m . deliverLatest ( snap1 , ch1 )
m . deliverLatest ( snap2 , ch1 )
// Check we got the _second_ one
require . Equal ( snap2 , <- ch1 )
// Same again for 5-buffered chan
ch5 := make ( chan * ConfigSnapshot , 5 )
// Sending to an unblocked chan should work
m . deliverLatest ( snap1 , ch5 )
// Check it was delivered
require . Equal ( snap1 , <- ch5 )
// Now send enough to fill the chan simulating a slow client
for i := 0 ; i < 5 ; i ++ {
m . deliverLatest ( snap1 , ch5 )
}
m . deliverLatest ( snap2 , ch5 )
// Check we got the _second_ one
require . Equal ( snap2 , <- ch5 )
}