@ -822,6 +822,147 @@ func TestServer_DeltaAggregatedResources_v3_BasicProtocol_TCP_clusterChangesImpa
}
}
func TestServer_DeltaAggregatedResources_v3_BasicProtocol_TCP_clusterChangeBeforeEndpointAck ( t * testing . T ) {
// This test ensures that the following race condition does not block indefinitely:
// - Send update endpoints
// - Send update cluster
// - Recv ACK endpoints
// - Recv ACK cluster
// Prior to a bug fix, this would have resulted in the endpoints NOT existing in Envoy. This occurred because
// the cluster update implicitly clears the endpoints in Envoy, but we would never re-send the endpoint data
// to compensate for the loss because we would incorrectly ACK the invalid old endpoint hash. Since the
// endpoint's hash did not actually change, they would not be resent.
aclResolve := func ( id string ) ( acl . Authorizer , error ) {
// Allow all
return acl . RootAuthorizer ( "manage" ) , nil
}
scenario := newTestServerDeltaScenario ( t , aclResolve , "web-sidecar-proxy" , "" , 0 )
mgr , errCh , envoy := scenario . mgr , scenario . errCh , scenario . envoy
sid := structs . NewServiceID ( "web-sidecar-proxy" , nil )
// Register the proxy to create state needed to Watch() on
mgr . RegisterProxy ( t , sid )
var snap * proxycfg . ConfigSnapshot
testutil . RunStep ( t , "initial setup" , func ( t * testing . T ) {
snap = newTestSnapshot ( t , nil , "" , nil )
// Send initial cluster discover.
envoy . SendDeltaReq ( t , xdscommon . ClusterType , & envoy_discovery_v3 . DeltaDiscoveryRequest { } )
// Check no response sent yet
assertDeltaChanBlocked ( t , envoy . deltaStream . sendCh )
requireProtocolVersionGauge ( t , scenario , "v3" , 1 )
// Deliver a new snapshot (tcp with one tcp upstream)
mgr . DeliverConfig ( t , sid , snap )
assertDeltaResponseSent ( t , envoy . deltaStream . sendCh , & envoy_discovery_v3 . DeltaDiscoveryResponse {
TypeUrl : xdscommon . ClusterType ,
Nonce : hexString ( 1 ) ,
Resources : makeTestResources ( t ,
makeTestCluster ( t , snap , "tcp:local_app" ) ,
makeTestCluster ( t , snap , "tcp:db" ) ,
makeTestCluster ( t , snap , "tcp:geo-cache" ) ,
) ,
} )
} )
var newSnap * proxycfg . ConfigSnapshot
testutil . RunStep ( t , "resend cluster immediately" , func ( t * testing . T ) {
// Deliver updated snapshot with new CA roots and leaf certificate. This will not be
// sent to Envoy until the initial set of cluster message is ACKed.
newSnap = newTestSnapshot ( t , nil , "" , nil )
mgr . DeliverConfig ( t , sid , newSnap )
// Envoy then tries to discover endpoints for clusters.
envoy . SendDeltaReq ( t , xdscommon . EndpointType , & envoy_discovery_v3 . DeltaDiscoveryRequest {
ResourceNamesSubscribe : [ ] string {
"db.default.dc1.internal.11111111-2222-3333-4444-555555555555.consul" ,
"geo-cache.default.dc1.query.11111111-2222-3333-4444-555555555555.consul" ,
} ,
} )
// We should get a response immediately since the config is already present in
// the server for endpoints. Note that this should not be racy if the server
// is behaving well since the Cluster send above should be blocked until we
// deliver a new config version.
assertDeltaResponseSent ( t , envoy . deltaStream . sendCh , & envoy_discovery_v3 . DeltaDiscoveryResponse {
TypeUrl : xdscommon . EndpointType ,
Nonce : hexString ( 2 ) ,
Resources : makeTestResources ( t ,
makeTestEndpoints ( t , snap , "tcp:db" ) ,
makeTestEndpoints ( t , snap , "tcp:geo-cache" ) ,
) ,
} )
// After receiving the endpoints Envoy sends an ACK for the clusters
envoy . SendDeltaReqACK ( t , xdscommon . ClusterType , 1 )
// The updated cluster snapshot with new certificates is sent immediately
// after the first is ACKed.
assertDeltaResponseSent ( t , envoy . deltaStream . sendCh , & envoy_discovery_v3 . DeltaDiscoveryResponse {
TypeUrl : xdscommon . ClusterType ,
Nonce : hexString ( 3 ) ,
Resources : makeTestResources ( t ,
// SAME makeTestCluster(t, snap, "tcp:local_app"),
makeTestCluster ( t , newSnap , "tcp:db" ) ,
makeTestCluster ( t , newSnap , "tcp:geo-cache" ) ,
) ,
} )
} )
testutil . RunStep ( t , "resend endpoints" , func ( t * testing . T ) {
// Envoy requests listeners because it has received endpoints. We won't send listeners
// until Envoy ACKs the second cluster update.
envoy . SendDeltaReq ( t , xdscommon . ListenerType , nil )
// Envoy ACKs the endpoints from the first cluster update.
envoy . SendDeltaReqACK ( t , xdscommon . EndpointType , 2 )
// Resend endpoints because the clusters changed.
assertDeltaResponseSent ( t , envoy . deltaStream . sendCh , & envoy_discovery_v3 . DeltaDiscoveryResponse {
TypeUrl : xdscommon . EndpointType ,
Nonce : hexString ( 4 ) ,
Resources : makeTestResources ( t ,
makeTestEndpoints ( t , newSnap , "tcp:db" ) ,
makeTestEndpoints ( t , newSnap , "tcp:geo-cache" ) ,
) ,
} )
// Envoy ACKs the new cluster and endpoints.
envoy . SendDeltaReqACK ( t , xdscommon . ClusterType , 3 )
envoy . SendDeltaReqACK ( t , xdscommon . EndpointType , 4 )
// Listeners are sent after the cluster and endpoints are ACKed.
assertDeltaResponseSent ( t , envoy . deltaStream . sendCh , & envoy_discovery_v3 . DeltaDiscoveryResponse {
TypeUrl : xdscommon . ListenerType ,
Nonce : hexString ( 5 ) ,
Resources : makeTestResources ( t ,
makeTestListener ( t , newSnap , "tcp:public_listener" ) ,
makeTestListener ( t , newSnap , "tcp:db" ) ,
makeTestListener ( t , newSnap , "tcp:geo-cache" ) ,
) ,
} )
// We are caught up, so there should be nothing queued to send.
assertDeltaChanBlocked ( t , envoy . deltaStream . sendCh )
// ACKs the listener
envoy . SendDeltaReqACK ( t , xdscommon . ListenerType , 5 )
} )
envoy . Close ( )
select {
case err := <- errCh :
require . NoError ( t , err )
case <- time . After ( 50 * time . Millisecond ) :
t . Fatalf ( "timed out waiting for handler to finish" )
}
}
func TestServer_DeltaAggregatedResources_v3_BasicProtocol_HTTP2_RDS_listenerChangesImpactRoutes ( t * testing . T ) {
aclResolve := func ( id string ) ( acl . Authorizer , error ) {
// Allow all