mirror of https://github.com/hashicorp/consul
You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
376 lines
14 KiB
376 lines
14 KiB
package proxycfg |
|
|
|
import ( |
|
"context" |
|
"fmt" |
|
"strings" |
|
|
|
"github.com/hashicorp/consul/agent/cache" |
|
cachetype "github.com/hashicorp/consul/agent/cache-types" |
|
"github.com/hashicorp/consul/agent/structs" |
|
) |
|
|
|
type handlerTerminatingGateway struct { |
|
handlerState |
|
} |
|
|
|
// initialize sets up the initial watches needed based on the terminating-gateway registration |
|
func (s *handlerTerminatingGateway) initialize(ctx context.Context) (ConfigSnapshot, error) { |
|
snap := newConfigSnapshotFromServiceInstance(s.serviceInstance, s.stateConfig) |
|
// Watch for root changes |
|
err := s.cache.Notify(ctx, cachetype.ConnectCARootName, &structs.DCSpecificRequest{ |
|
Datacenter: s.source.Datacenter, |
|
QueryOptions: structs.QueryOptions{Token: s.token}, |
|
Source: *s.source, |
|
}, rootsWatchID, s.ch) |
|
if err != nil { |
|
s.logger.Error("failed to register watch for root changes", "error", err) |
|
return snap, err |
|
} |
|
|
|
// Get information about the entire service mesh. |
|
err = s.cache.Notify(ctx, cachetype.ConfigEntryName, &structs.ConfigEntryQuery{ |
|
Kind: structs.MeshConfig, |
|
Name: structs.MeshConfigMesh, |
|
Datacenter: s.source.Datacenter, |
|
QueryOptions: structs.QueryOptions{Token: s.token}, |
|
EnterpriseMeta: *structs.DefaultEnterpriseMetaInPartition(s.proxyID.PartitionOrDefault()), |
|
}, meshConfigEntryID, s.ch) |
|
if err != nil { |
|
return snap, err |
|
} |
|
|
|
// Watch for the terminating-gateway's linked services |
|
err = s.cache.Notify(ctx, cachetype.GatewayServicesName, &structs.ServiceSpecificRequest{ |
|
Datacenter: s.source.Datacenter, |
|
QueryOptions: structs.QueryOptions{Token: s.token}, |
|
ServiceName: s.service, |
|
EnterpriseMeta: s.proxyID.EnterpriseMeta, |
|
}, gatewayServicesWatchID, s.ch) |
|
if err != nil { |
|
s.logger.Error("failed to register watch for linked services", "error", err) |
|
return snap, err |
|
} |
|
|
|
snap.TerminatingGateway.WatchedServices = make(map[structs.ServiceName]context.CancelFunc) |
|
snap.TerminatingGateway.WatchedIntentions = make(map[structs.ServiceName]context.CancelFunc) |
|
snap.TerminatingGateway.Intentions = make(map[structs.ServiceName]structs.Intentions) |
|
snap.TerminatingGateway.WatchedLeaves = make(map[structs.ServiceName]context.CancelFunc) |
|
snap.TerminatingGateway.ServiceLeaves = make(map[structs.ServiceName]*structs.IssuedCert) |
|
snap.TerminatingGateway.WatchedConfigs = make(map[structs.ServiceName]context.CancelFunc) |
|
snap.TerminatingGateway.ServiceConfigs = make(map[structs.ServiceName]*structs.ServiceConfigResponse) |
|
snap.TerminatingGateway.WatchedResolvers = make(map[structs.ServiceName]context.CancelFunc) |
|
snap.TerminatingGateway.ServiceResolvers = make(map[structs.ServiceName]*structs.ServiceResolverConfigEntry) |
|
snap.TerminatingGateway.ServiceResolversSet = make(map[structs.ServiceName]bool) |
|
snap.TerminatingGateway.ServiceGroups = make(map[structs.ServiceName]structs.CheckServiceNodes) |
|
snap.TerminatingGateway.GatewayServices = make(map[structs.ServiceName]structs.GatewayService) |
|
snap.TerminatingGateway.HostnameServices = make(map[structs.ServiceName]structs.CheckServiceNodes) |
|
return snap, nil |
|
} |
|
|
|
func (s *handlerTerminatingGateway) handleUpdate(ctx context.Context, u cache.UpdateEvent, snap *ConfigSnapshot) error { |
|
if u.Err != nil { |
|
return fmt.Errorf("error filling agent cache: %v", u.Err) |
|
} |
|
logger := s.logger |
|
|
|
switch { |
|
case u.CorrelationID == rootsWatchID: |
|
roots, ok := u.Result.(*structs.IndexedCARoots) |
|
if !ok { |
|
return fmt.Errorf("invalid type for response: %T", u.Result) |
|
} |
|
snap.Roots = roots |
|
|
|
case u.CorrelationID == meshConfigEntryID: |
|
resp, ok := u.Result.(*structs.ConfigEntryResponse) |
|
if !ok { |
|
return fmt.Errorf("invalid type for response: %T", u.Result) |
|
} |
|
|
|
if resp.Entry != nil { |
|
meshConf, ok := resp.Entry.(*structs.MeshConfigEntry) |
|
if !ok { |
|
return fmt.Errorf("invalid type for config entry: %T", resp.Entry) |
|
} |
|
snap.TerminatingGateway.MeshConfig = meshConf |
|
} else { |
|
snap.TerminatingGateway.MeshConfig = nil |
|
} |
|
snap.TerminatingGateway.MeshConfigSet = true |
|
|
|
// Update watches based on the current list of services associated with the terminating-gateway |
|
case u.CorrelationID == gatewayServicesWatchID: |
|
services, ok := u.Result.(*structs.IndexedGatewayServices) |
|
if !ok { |
|
return fmt.Errorf("invalid type for response: %T", u.Result) |
|
} |
|
|
|
svcMap := make(map[structs.ServiceName]struct{}) |
|
for _, svc := range services.Services { |
|
// Make sure to add every service to this map, we use it to cancel watches below. |
|
svcMap[svc.Service] = struct{}{} |
|
|
|
// Store the gateway <-> service mapping for TLS origination |
|
snap.TerminatingGateway.GatewayServices[svc.Service] = *svc |
|
|
|
// Watch the health endpoint to discover endpoints for the service |
|
if _, ok := snap.TerminatingGateway.WatchedServices[svc.Service]; !ok { |
|
ctx, cancel := context.WithCancel(ctx) |
|
err := s.health.Notify(ctx, structs.ServiceSpecificRequest{ |
|
Datacenter: s.source.Datacenter, |
|
QueryOptions: structs.QueryOptions{Token: s.token}, |
|
ServiceName: svc.Service.Name, |
|
EnterpriseMeta: svc.Service.EnterpriseMeta, |
|
|
|
// The gateway acts as the service's proxy, so we do NOT want to discover other proxies |
|
Connect: false, |
|
}, externalServiceIDPrefix+svc.Service.String(), s.ch) |
|
|
|
if err != nil { |
|
logger.Error("failed to register watch for external-service", |
|
"service", svc.Service.String(), |
|
"error", err, |
|
) |
|
cancel() |
|
return err |
|
} |
|
snap.TerminatingGateway.WatchedServices[svc.Service] = cancel |
|
} |
|
|
|
// Watch intentions with this service as their destination |
|
// The gateway will enforce intentions for connections to the service |
|
if _, ok := snap.TerminatingGateway.WatchedIntentions[svc.Service]; !ok { |
|
ctx, cancel := context.WithCancel(ctx) |
|
err := s.cache.Notify(ctx, cachetype.IntentionMatchName, &structs.IntentionQueryRequest{ |
|
Datacenter: s.source.Datacenter, |
|
QueryOptions: structs.QueryOptions{Token: s.token}, |
|
Match: &structs.IntentionQueryMatch{ |
|
Type: structs.IntentionMatchDestination, |
|
Entries: []structs.IntentionMatchEntry{ |
|
{ |
|
Namespace: svc.Service.NamespaceOrDefault(), |
|
Partition: svc.Service.PartitionOrDefault(), |
|
Name: svc.Service.Name, |
|
}, |
|
}, |
|
}, |
|
}, serviceIntentionsIDPrefix+svc.Service.String(), s.ch) |
|
|
|
if err != nil { |
|
logger.Error("failed to register watch for service-intentions", |
|
"service", svc.Service.String(), |
|
"error", err, |
|
) |
|
cancel() |
|
return err |
|
} |
|
snap.TerminatingGateway.WatchedIntentions[svc.Service] = cancel |
|
} |
|
|
|
// Watch leaf certificate for the service |
|
// This cert is used to terminate mTLS connections on the service's behalf |
|
if _, ok := snap.TerminatingGateway.WatchedLeaves[svc.Service]; !ok { |
|
ctx, cancel := context.WithCancel(ctx) |
|
err := s.cache.Notify(ctx, cachetype.ConnectCALeafName, &cachetype.ConnectCALeafRequest{ |
|
Datacenter: s.source.Datacenter, |
|
Token: s.token, |
|
Service: svc.Service.Name, |
|
EnterpriseMeta: svc.Service.EnterpriseMeta, |
|
}, serviceLeafIDPrefix+svc.Service.String(), s.ch) |
|
|
|
if err != nil { |
|
logger.Error("failed to register watch for a service-leaf", |
|
"service", svc.Service.String(), |
|
"error", err, |
|
) |
|
cancel() |
|
return err |
|
} |
|
snap.TerminatingGateway.WatchedLeaves[svc.Service] = cancel |
|
} |
|
|
|
// Watch service configs for the service. |
|
// These are used to determine the protocol for the target service. |
|
if _, ok := snap.TerminatingGateway.WatchedConfigs[svc.Service]; !ok { |
|
ctx, cancel := context.WithCancel(ctx) |
|
err := s.cache.Notify(ctx, cachetype.ResolvedServiceConfigName, &structs.ServiceConfigRequest{ |
|
Datacenter: s.source.Datacenter, |
|
QueryOptions: structs.QueryOptions{Token: s.token}, |
|
Name: svc.Service.Name, |
|
EnterpriseMeta: svc.Service.EnterpriseMeta, |
|
}, serviceConfigIDPrefix+svc.Service.String(), s.ch) |
|
|
|
if err != nil { |
|
logger.Error("failed to register watch for a resolved service config", |
|
"service", svc.Service.String(), |
|
"error", err, |
|
) |
|
cancel() |
|
return err |
|
} |
|
snap.TerminatingGateway.WatchedConfigs[svc.Service] = cancel |
|
} |
|
|
|
// Watch service resolvers for the service |
|
// These are used to create clusters and endpoints for the service subsets |
|
if _, ok := snap.TerminatingGateway.WatchedResolvers[svc.Service]; !ok { |
|
ctx, cancel := context.WithCancel(ctx) |
|
err := s.cache.Notify(ctx, cachetype.ConfigEntriesName, &structs.ConfigEntryQuery{ |
|
Datacenter: s.source.Datacenter, |
|
QueryOptions: structs.QueryOptions{Token: s.token}, |
|
Kind: structs.ServiceResolver, |
|
Name: svc.Service.Name, |
|
EnterpriseMeta: svc.Service.EnterpriseMeta, |
|
}, serviceResolverIDPrefix+svc.Service.String(), s.ch) |
|
|
|
if err != nil { |
|
logger.Error("failed to register watch for a service-resolver", |
|
"service", svc.Service.String(), |
|
"error", err, |
|
) |
|
cancel() |
|
return err |
|
} |
|
snap.TerminatingGateway.WatchedResolvers[svc.Service] = cancel |
|
} |
|
} |
|
|
|
// Delete gateway service mapping for services that were not in the update |
|
for sn := range snap.TerminatingGateway.GatewayServices { |
|
if _, ok := svcMap[sn]; !ok { |
|
delete(snap.TerminatingGateway.GatewayServices, sn) |
|
} |
|
} |
|
|
|
// Clean up services with hostname mapping for services that were not in the update |
|
for sn := range snap.TerminatingGateway.HostnameServices { |
|
if _, ok := svcMap[sn]; !ok { |
|
delete(snap.TerminatingGateway.HostnameServices, sn) |
|
} |
|
} |
|
|
|
// Cancel service instance watches for services that were not in the update |
|
for sn, cancelFn := range snap.TerminatingGateway.WatchedServices { |
|
if _, ok := svcMap[sn]; !ok { |
|
logger.Debug("canceling watch for service", "service", sn.String()) |
|
delete(snap.TerminatingGateway.WatchedServices, sn) |
|
delete(snap.TerminatingGateway.ServiceGroups, sn) |
|
cancelFn() |
|
} |
|
} |
|
|
|
// Cancel leaf cert watches for services that were not in the update |
|
for sn, cancelFn := range snap.TerminatingGateway.WatchedLeaves { |
|
if _, ok := svcMap[sn]; !ok { |
|
logger.Debug("canceling watch for leaf cert", "service", sn.String()) |
|
delete(snap.TerminatingGateway.WatchedLeaves, sn) |
|
delete(snap.TerminatingGateway.ServiceLeaves, sn) |
|
cancelFn() |
|
} |
|
} |
|
|
|
// Cancel service config watches for services that were not in the update |
|
for sn, cancelFn := range snap.TerminatingGateway.WatchedConfigs { |
|
if _, ok := svcMap[sn]; !ok { |
|
logger.Debug("canceling watch for resolved service config", "service", sn.String()) |
|
delete(snap.TerminatingGateway.WatchedConfigs, sn) |
|
delete(snap.TerminatingGateway.ServiceConfigs, sn) |
|
cancelFn() |
|
} |
|
} |
|
|
|
// Cancel service-resolver watches for services that were not in the update |
|
for sn, cancelFn := range snap.TerminatingGateway.WatchedResolvers { |
|
if _, ok := svcMap[sn]; !ok { |
|
logger.Debug("canceling watch for service-resolver", "service", sn.String()) |
|
delete(snap.TerminatingGateway.WatchedResolvers, sn) |
|
delete(snap.TerminatingGateway.ServiceResolvers, sn) |
|
delete(snap.TerminatingGateway.ServiceResolversSet, sn) |
|
cancelFn() |
|
} |
|
} |
|
|
|
// Cancel intention watches for services that were not in the update |
|
for sn, cancelFn := range snap.TerminatingGateway.WatchedIntentions { |
|
if _, ok := svcMap[sn]; !ok { |
|
logger.Debug("canceling watch for intention", "service", sn.String()) |
|
delete(snap.TerminatingGateway.WatchedIntentions, sn) |
|
delete(snap.TerminatingGateway.Intentions, sn) |
|
cancelFn() |
|
} |
|
} |
|
|
|
case strings.HasPrefix(u.CorrelationID, externalServiceIDPrefix): |
|
resp, ok := u.Result.(*structs.IndexedCheckServiceNodes) |
|
if !ok { |
|
return fmt.Errorf("invalid type for response: %T", u.Result) |
|
} |
|
|
|
sn := structs.ServiceNameFromString(strings.TrimPrefix(u.CorrelationID, externalServiceIDPrefix)) |
|
delete(snap.TerminatingGateway.ServiceGroups, sn) |
|
delete(snap.TerminatingGateway.HostnameServices, sn) |
|
|
|
if len(resp.Nodes) > 0 { |
|
snap.TerminatingGateway.ServiceGroups[sn] = resp.Nodes |
|
snap.TerminatingGateway.HostnameServices[sn] = hostnameEndpoints( |
|
s.logger, |
|
snap.Locality, |
|
resp.Nodes, |
|
) |
|
} |
|
|
|
// Store leaf cert for watched service |
|
case strings.HasPrefix(u.CorrelationID, serviceLeafIDPrefix): |
|
leaf, ok := u.Result.(*structs.IssuedCert) |
|
if !ok { |
|
return fmt.Errorf("invalid type for response: %T", u.Result) |
|
} |
|
|
|
sn := structs.ServiceNameFromString(strings.TrimPrefix(u.CorrelationID, serviceLeafIDPrefix)) |
|
snap.TerminatingGateway.ServiceLeaves[sn] = leaf |
|
|
|
case strings.HasPrefix(u.CorrelationID, serviceConfigIDPrefix): |
|
serviceConfig, ok := u.Result.(*structs.ServiceConfigResponse) |
|
if !ok { |
|
return fmt.Errorf("invalid type for response: %T", u.Result) |
|
} |
|
|
|
sn := structs.ServiceNameFromString(strings.TrimPrefix(u.CorrelationID, serviceConfigIDPrefix)) |
|
snap.TerminatingGateway.ServiceConfigs[sn] = serviceConfig |
|
|
|
case strings.HasPrefix(u.CorrelationID, serviceResolverIDPrefix): |
|
configEntries, ok := u.Result.(*structs.IndexedConfigEntries) |
|
if !ok { |
|
return fmt.Errorf("invalid type for response: %T", u.Result) |
|
} |
|
sn := structs.ServiceNameFromString(strings.TrimPrefix(u.CorrelationID, serviceResolverIDPrefix)) |
|
// There should only ever be one entry for a service resolver within a namespace |
|
if len(configEntries.Entries) == 1 { |
|
if resolver, ok := configEntries.Entries[0].(*structs.ServiceResolverConfigEntry); ok { |
|
snap.TerminatingGateway.ServiceResolvers[sn] = resolver |
|
} |
|
} |
|
snap.TerminatingGateway.ServiceResolversSet[sn] = true |
|
|
|
case strings.HasPrefix(u.CorrelationID, serviceIntentionsIDPrefix): |
|
resp, ok := u.Result.(*structs.IndexedIntentionMatches) |
|
if !ok { |
|
return fmt.Errorf("invalid type for response: %T", u.Result) |
|
} |
|
|
|
sn := structs.ServiceNameFromString(strings.TrimPrefix(u.CorrelationID, serviceIntentionsIDPrefix)) |
|
|
|
if len(resp.Matches) > 0 { |
|
// RPC supports matching multiple services at once but we only ever |
|
// query with the one service we represent currently so just pick |
|
// the one result set up. |
|
snap.TerminatingGateway.Intentions[sn] = resp.Matches[0] |
|
} |
|
|
|
default: |
|
// do nothing |
|
} |
|
|
|
return nil |
|
}
|
|
|