package proxycfg
import (
"context"
"fmt"
"strings"
"github.com/hashicorp/consul/agent/cache"
cachetype "github.com/hashicorp/consul/agent/cache-types"
"github.com/hashicorp/consul/agent/structs"
)
type handlerTerminatingGateway struct {
handlerState
}
// initialize sets up the initial watches needed based on the terminating-gateway registration
func ( s * handlerTerminatingGateway ) initialize ( ctx context . Context ) ( ConfigSnapshot , error ) {
snap := newConfigSnapshotFromServiceInstance ( s . serviceInstance , s . stateConfig )
// Watch for root changes
err := s . cache . Notify ( ctx , cachetype . ConnectCARootName , & structs . DCSpecificRequest {
Datacenter : s . source . Datacenter ,
QueryOptions : structs . QueryOptions { Token : s . token } ,
Source : * s . source ,
} , rootsWatchID , s . ch )
if err != nil {
s . logger . Error ( "failed to register watch for root changes" , "error" , err )
return snap , err
}
// Watch for the terminating-gateway's linked services
err = s . cache . Notify ( ctx , cachetype . GatewayServicesName , & structs . ServiceSpecificRequest {
Datacenter : s . source . Datacenter ,
QueryOptions : structs . QueryOptions { Token : s . token } ,
ServiceName : s . service ,
EnterpriseMeta : s . proxyID . EnterpriseMeta ,
} , gatewayServicesWatchID , s . ch )
if err != nil {
s . logger . Error ( "failed to register watch for linked services" , "error" , err )
return snap , err
}
snap . TerminatingGateway . WatchedServices = make ( map [ structs . ServiceName ] context . CancelFunc )
snap . TerminatingGateway . WatchedIntentions = make ( map [ structs . ServiceName ] context . CancelFunc )
snap . TerminatingGateway . Intentions = make ( map [ structs . ServiceName ] structs . Intentions )
snap . TerminatingGateway . WatchedLeaves = make ( map [ structs . ServiceName ] context . CancelFunc )
snap . TerminatingGateway . ServiceLeaves = make ( map [ structs . ServiceName ] * structs . IssuedCert )
snap . TerminatingGateway . WatchedConfigs = make ( map [ structs . ServiceName ] context . CancelFunc )
snap . TerminatingGateway . ServiceConfigs = make ( map [ structs . ServiceName ] * structs . ServiceConfigResponse )
snap . TerminatingGateway . WatchedResolvers = make ( map [ structs . ServiceName ] context . CancelFunc )
snap . TerminatingGateway . ServiceResolvers = make ( map [ structs . ServiceName ] * structs . ServiceResolverConfigEntry )
snap . TerminatingGateway . ServiceResolversSet = make ( map [ structs . ServiceName ] bool )
snap . TerminatingGateway . ServiceGroups = make ( map [ structs . ServiceName ] structs . CheckServiceNodes )
snap . TerminatingGateway . GatewayServices = make ( map [ structs . ServiceName ] structs . GatewayService )
snap . TerminatingGateway . HostnameServices = make ( map [ structs . ServiceName ] structs . CheckServiceNodes )
return snap , nil
}
func ( s * handlerTerminatingGateway ) handleUpdate ( ctx context . Context , u cache . UpdateEvent , snap * ConfigSnapshot ) error {
if u . Err != nil {
return fmt . Errorf ( "error filling agent cache: %v" , u . Err )
}
logger := s . logger
switch {
case u . CorrelationID == rootsWatchID :
roots , ok := u . Result . ( * structs . IndexedCARoots )
if ! ok {
return fmt . Errorf ( "invalid type for response: %T" , u . Result )
}
snap . Roots = roots
// Update watches based on the current list of services associated with the terminating-gateway
case u . CorrelationID == gatewayServicesWatchID :
services , ok := u . Result . ( * structs . IndexedGatewayServices )
if ! ok {
return fmt . Errorf ( "invalid type for response: %T" , u . Result )
}
svcMap := make ( map [ structs . ServiceName ] struct { } )
for _ , svc := range services . Services {
// Make sure to add every service to this map, we use it to cancel watches below.
svcMap [ svc . Service ] = struct { } { }
// Store the gateway <-> service mapping for TLS origination
snap . TerminatingGateway . GatewayServices [ svc . Service ] = * svc
// Watch the health endpoint to discover endpoints for the service
if _ , ok := snap . TerminatingGateway . WatchedServices [ svc . Service ] ; ! ok {
ctx , cancel := context . WithCancel ( ctx )
err := s . health . Notify ( ctx , structs . ServiceSpecificRequest {
Datacenter : s . source . Datacenter ,
QueryOptions : structs . QueryOptions { Token : s . token } ,
ServiceName : svc . Service . Name ,
EnterpriseMeta : svc . Service . EnterpriseMeta ,
// The gateway acts as the service's proxy, so we do NOT want to discover other proxies
Connect : false ,
} , externalServiceIDPrefix + svc . Service . String ( ) , s . ch )
if err != nil {
logger . Error ( "failed to register watch for external-service" ,
"service" , svc . Service . String ( ) ,
"error" , err ,
)
cancel ( )
return err
}
snap . TerminatingGateway . WatchedServices [ svc . Service ] = cancel
}
// Watch intentions with this service as their destination
// The gateway will enforce intentions for connections to the service
if _ , ok := snap . TerminatingGateway . WatchedIntentions [ svc . Service ] ; ! ok {
ctx , cancel := context . WithCancel ( ctx )
err := s . cache . Notify ( ctx , cachetype . IntentionMatchName , & structs . IntentionQueryRequest {
Datacenter : s . source . Datacenter ,
QueryOptions : structs . QueryOptions { Token : s . token } ,
Match : & structs . IntentionQueryMatch {
Type : structs . IntentionMatchDestination ,
Entries : [ ] structs . IntentionMatchEntry {
{
Namespace : svc . Service . NamespaceOrDefault ( ) ,
Partition : svc . Service . PartitionOrDefault ( ) ,
Name : svc . Service . Name ,
} ,
} ,
} ,
} , serviceIntentionsIDPrefix + svc . Service . String ( ) , s . ch )
if err != nil {
logger . Error ( "failed to register watch for service-intentions" ,
"service" , svc . Service . String ( ) ,
"error" , err ,
)
cancel ( )
return err
}
snap . TerminatingGateway . WatchedIntentions [ svc . Service ] = cancel
}
// Watch leaf certificate for the service
// This cert is used to terminate mTLS connections on the service's behalf
if _ , ok := snap . TerminatingGateway . WatchedLeaves [ svc . Service ] ; ! ok {
ctx , cancel := context . WithCancel ( ctx )
err := s . cache . Notify ( ctx , cachetype . ConnectCALeafName , & cachetype . ConnectCALeafRequest {
Datacenter : s . source . Datacenter ,
Token : s . token ,
Service : svc . Service . Name ,
EnterpriseMeta : svc . Service . EnterpriseMeta ,
} , serviceLeafIDPrefix + svc . Service . String ( ) , s . ch )
if err != nil {
logger . Error ( "failed to register watch for a service-leaf" ,
"service" , svc . Service . String ( ) ,
"error" , err ,
)
cancel ( )
return err
}
snap . TerminatingGateway . WatchedLeaves [ svc . Service ] = cancel
}
// Watch service configs for the service.
// These are used to determine the protocol for the target service.
if _ , ok := snap . TerminatingGateway . WatchedConfigs [ svc . Service ] ; ! ok {
ctx , cancel := context . WithCancel ( ctx )
err := s . cache . Notify ( ctx , cachetype . ResolvedServiceConfigName , & structs . ServiceConfigRequest {
Datacenter : s . source . Datacenter ,
QueryOptions : structs . QueryOptions { Token : s . token } ,
Name : svc . Service . Name ,
EnterpriseMeta : svc . Service . EnterpriseMeta ,
} , serviceConfigIDPrefix + svc . Service . String ( ) , s . ch )
if err != nil {
logger . Error ( "failed to register watch for a resolved service config" ,
"service" , svc . Service . String ( ) ,
"error" , err ,
)
cancel ( )
return err
}
snap . TerminatingGateway . WatchedConfigs [ svc . Service ] = cancel
}
// Watch service resolvers for the service
// These are used to create clusters and endpoints for the service subsets
if _ , ok := snap . TerminatingGateway . WatchedResolvers [ svc . Service ] ; ! ok {
ctx , cancel := context . WithCancel ( ctx )
err := s . cache . Notify ( ctx , cachetype . ConfigEntriesName , & structs . ConfigEntryQuery {
Datacenter : s . source . Datacenter ,
QueryOptions : structs . QueryOptions { Token : s . token } ,
Kind : structs . ServiceResolver ,
Name : svc . Service . Name ,
EnterpriseMeta : svc . Service . EnterpriseMeta ,
} , serviceResolverIDPrefix + svc . Service . String ( ) , s . ch )
if err != nil {
logger . Error ( "failed to register watch for a service-resolver" ,
"service" , svc . Service . String ( ) ,
"error" , err ,
)
cancel ( )
return err
}
snap . TerminatingGateway . WatchedResolvers [ svc . Service ] = cancel
}
}
// Delete gateway service mapping for services that were not in the update
for sn := range snap . TerminatingGateway . GatewayServices {
if _ , ok := svcMap [ sn ] ; ! ok {
delete ( snap . TerminatingGateway . GatewayServices , sn )
}
}
// Clean up services with hostname mapping for services that were not in the update
for sn := range snap . TerminatingGateway . HostnameServices {
if _ , ok := svcMap [ sn ] ; ! ok {
delete ( snap . TerminatingGateway . HostnameServices , sn )
}
}
// Cancel service instance watches for services that were not in the update
for sn , cancelFn := range snap . TerminatingGateway . WatchedServices {
if _ , ok := svcMap [ sn ] ; ! ok {
logger . Debug ( "canceling watch for service" , "service" , sn . String ( ) )
delete ( snap . TerminatingGateway . WatchedServices , sn )
delete ( snap . TerminatingGateway . ServiceGroups , sn )
cancelFn ( )
}
}
// Cancel leaf cert watches for services that were not in the update
for sn , cancelFn := range snap . TerminatingGateway . WatchedLeaves {
if _ , ok := svcMap [ sn ] ; ! ok {
logger . Debug ( "canceling watch for leaf cert" , "service" , sn . String ( ) )
delete ( snap . TerminatingGateway . WatchedLeaves , sn )
delete ( snap . TerminatingGateway . ServiceLeaves , sn )
cancelFn ( )
}
}
// Cancel service config watches for services that were not in the update
for sn , cancelFn := range snap . TerminatingGateway . WatchedConfigs {
if _ , ok := svcMap [ sn ] ; ! ok {
logger . Debug ( "canceling watch for resolved service config" , "service" , sn . String ( ) )
delete ( snap . TerminatingGateway . WatchedConfigs , sn )
delete ( snap . TerminatingGateway . ServiceConfigs , sn )
cancelFn ( )
}
}
// Cancel service-resolver watches for services that were not in the update
for sn , cancelFn := range snap . TerminatingGateway . WatchedResolvers {
if _ , ok := svcMap [ sn ] ; ! ok {
logger . Debug ( "canceling watch for service-resolver" , "service" , sn . String ( ) )
delete ( snap . TerminatingGateway . WatchedResolvers , sn )
delete ( snap . TerminatingGateway . ServiceResolvers , sn )
delete ( snap . TerminatingGateway . ServiceResolversSet , sn )
cancelFn ( )
}
}
// Cancel intention watches for services that were not in the update
for sn , cancelFn := range snap . TerminatingGateway . WatchedIntentions {
if _ , ok := svcMap [ sn ] ; ! ok {
logger . Debug ( "canceling watch for intention" , "service" , sn . String ( ) )
delete ( snap . TerminatingGateway . WatchedIntentions , sn )
delete ( snap . TerminatingGateway . Intentions , sn )
cancelFn ( )
}
}
case strings . HasPrefix ( u . CorrelationID , externalServiceIDPrefix ) :
resp , ok := u . Result . ( * structs . IndexedCheckServiceNodes )
if ! ok {
return fmt . Errorf ( "invalid type for response: %T" , u . Result )
}
sn := structs . ServiceNameFromString ( strings . TrimPrefix ( u . CorrelationID , externalServiceIDPrefix ) )
delete ( snap . TerminatingGateway . ServiceGroups , sn )
delete ( snap . TerminatingGateway . HostnameServices , sn )
if len ( resp . Nodes ) > 0 {
snap . TerminatingGateway . ServiceGroups [ sn ] = resp . Nodes
snap . TerminatingGateway . HostnameServices [ sn ] = hostnameEndpoints (
s . logger ,
GatewayKey { Partition : snap . ProxyID . PartitionOrDefault ( ) , Datacenter : snap . Datacenter } ,
resp . Nodes ,
)
}
// Store leaf cert for watched service
case strings . HasPrefix ( u . CorrelationID , serviceLeafIDPrefix ) :
leaf , ok := u . Result . ( * structs . IssuedCert )
if ! ok {
return fmt . Errorf ( "invalid type for response: %T" , u . Result )
}
sn := structs . ServiceNameFromString ( strings . TrimPrefix ( u . CorrelationID , serviceLeafIDPrefix ) )
snap . TerminatingGateway . ServiceLeaves [ sn ] = leaf
case strings . HasPrefix ( u . CorrelationID , serviceConfigIDPrefix ) :
serviceConfig , ok := u . Result . ( * structs . ServiceConfigResponse )
if ! ok {
return fmt . Errorf ( "invalid type for response: %T" , u . Result )
}
sn := structs . ServiceNameFromString ( strings . TrimPrefix ( u . CorrelationID , serviceConfigIDPrefix ) )
snap . TerminatingGateway . ServiceConfigs [ sn ] = serviceConfig
case strings . HasPrefix ( u . CorrelationID , serviceResolverIDPrefix ) :
configEntries , ok := u . Result . ( * structs . IndexedConfigEntries )
if ! ok {
return fmt . Errorf ( "invalid type for response: %T" , u . Result )
}
sn := structs . ServiceNameFromString ( strings . TrimPrefix ( u . CorrelationID , serviceResolverIDPrefix ) )
// There should only ever be one entry for a service resolver within a namespace
if len ( configEntries . Entries ) == 1 {
if resolver , ok := configEntries . Entries [ 0 ] . ( * structs . ServiceResolverConfigEntry ) ; ok {
snap . TerminatingGateway . ServiceResolvers [ sn ] = resolver
}
}
snap . TerminatingGateway . ServiceResolversSet [ sn ] = true
case strings . HasPrefix ( u . CorrelationID , serviceIntentionsIDPrefix ) :
resp , ok := u . Result . ( * structs . IndexedIntentionMatches )
if ! ok {
return fmt . Errorf ( "invalid type for response: %T" , u . Result )
}
sn := structs . ServiceNameFromString ( strings . TrimPrefix ( u . CorrelationID , serviceIntentionsIDPrefix ) )
if len ( resp . Matches ) > 0 {
// RPC supports matching multiple services at once but we only ever
// query with the one service we represent currently so just pick
// the one result set up.
snap . TerminatingGateway . Intentions [ sn ] = resp . Matches [ 0 ]
}
default :
// do nothing
}
return nil
}