mirror of https://github.com/hashicorp/consul
You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
430 lines
16 KiB
430 lines
16 KiB
package proxycfg |
|
|
|
import ( |
|
"context" |
|
"fmt" |
|
"strings" |
|
|
|
cachetype "github.com/hashicorp/consul/agent/cache-types" |
|
"github.com/hashicorp/consul/agent/configentry" |
|
"github.com/hashicorp/consul/agent/consul/discoverychain" |
|
"github.com/hashicorp/consul/agent/structs" |
|
"github.com/hashicorp/consul/proto/pbpeering" |
|
) |
|
|
|
type handlerConnectProxy struct { |
|
handlerState |
|
} |
|
|
|
// initialize sets up the watches needed based on current proxy registration |
|
// state. |
|
func (s *handlerConnectProxy) initialize(ctx context.Context) (ConfigSnapshot, error) { |
|
snap := newConfigSnapshotFromServiceInstance(s.serviceInstance, s.stateConfig) |
|
snap.ConnectProxy.DiscoveryChain = make(map[UpstreamID]*structs.CompiledDiscoveryChain) |
|
snap.ConnectProxy.WatchedDiscoveryChains = make(map[UpstreamID]context.CancelFunc) |
|
snap.ConnectProxy.WatchedUpstreams = make(map[UpstreamID]map[string]context.CancelFunc) |
|
snap.ConnectProxy.WatchedUpstreamEndpoints = make(map[UpstreamID]map[string]structs.CheckServiceNodes) |
|
snap.ConnectProxy.WatchedPeerTrustBundles = make(map[string]context.CancelFunc) |
|
snap.ConnectProxy.PeerTrustBundles = make(map[string]*pbpeering.PeeringTrustBundle) |
|
snap.ConnectProxy.WatchedGateways = make(map[UpstreamID]map[string]context.CancelFunc) |
|
snap.ConnectProxy.WatchedGatewayEndpoints = make(map[UpstreamID]map[string]structs.CheckServiceNodes) |
|
snap.ConnectProxy.WatchedServiceChecks = make(map[structs.ServiceID][]structs.CheckType) |
|
snap.ConnectProxy.PreparedQueryEndpoints = make(map[UpstreamID]structs.CheckServiceNodes) |
|
snap.ConnectProxy.UpstreamConfig = make(map[UpstreamID]*structs.Upstream) |
|
snap.ConnectProxy.PassthroughUpstreams = make(map[UpstreamID]map[string]map[string]struct{}) |
|
snap.ConnectProxy.PassthroughIndices = make(map[string]indexedTarget) |
|
|
|
// Watch for root changes |
|
err := s.dataSources.CARoots.Notify(ctx, &structs.DCSpecificRequest{ |
|
Datacenter: s.source.Datacenter, |
|
QueryOptions: structs.QueryOptions{Token: s.token}, |
|
Source: *s.source, |
|
}, rootsWatchID, s.ch) |
|
if err != nil { |
|
return snap, err |
|
} |
|
|
|
// Watch the leaf cert |
|
err = s.dataSources.LeafCertificate.Notify(ctx, &cachetype.ConnectCALeafRequest{ |
|
Datacenter: s.source.Datacenter, |
|
Token: s.token, |
|
Service: s.proxyCfg.DestinationServiceName, |
|
EnterpriseMeta: s.proxyID.EnterpriseMeta, |
|
}, leafWatchID, s.ch) |
|
if err != nil { |
|
return snap, err |
|
} |
|
|
|
// Watch for intention updates |
|
err = s.dataSources.Intentions.Notify(ctx, &structs.IntentionQueryRequest{ |
|
Datacenter: s.source.Datacenter, |
|
QueryOptions: structs.QueryOptions{Token: s.token}, |
|
Match: &structs.IntentionQueryMatch{ |
|
Type: structs.IntentionMatchDestination, |
|
Entries: []structs.IntentionMatchEntry{ |
|
{ |
|
Namespace: s.proxyID.NamespaceOrDefault(), |
|
Partition: s.proxyID.PartitionOrDefault(), |
|
Name: s.proxyCfg.DestinationServiceName, |
|
}, |
|
}, |
|
}, |
|
}, intentionsWatchID, s.ch) |
|
if err != nil { |
|
return snap, err |
|
} |
|
|
|
// Get information about the entire service mesh. |
|
err = s.dataSources.ConfigEntry.Notify(ctx, &structs.ConfigEntryQuery{ |
|
Kind: structs.MeshConfig, |
|
Name: structs.MeshConfigMesh, |
|
Datacenter: s.source.Datacenter, |
|
QueryOptions: structs.QueryOptions{Token: s.token}, |
|
EnterpriseMeta: *structs.DefaultEnterpriseMetaInPartition(s.proxyID.PartitionOrDefault()), |
|
}, meshConfigEntryID, s.ch) |
|
if err != nil { |
|
return snap, err |
|
} |
|
|
|
// Watch for service check updates |
|
err = s.dataSources.HTTPChecks.Notify(ctx, &cachetype.ServiceHTTPChecksRequest{ |
|
ServiceID: s.proxyCfg.DestinationServiceID, |
|
EnterpriseMeta: s.proxyID.EnterpriseMeta, |
|
}, svcChecksWatchIDPrefix+structs.ServiceIDString(s.proxyCfg.DestinationServiceID, &s.proxyID.EnterpriseMeta), s.ch) |
|
if err != nil { |
|
return snap, err |
|
} |
|
|
|
if s.proxyCfg.Mode == structs.ProxyModeTransparent { |
|
// When in transparent proxy we will infer upstreams from intentions with this source |
|
err := s.dataSources.IntentionUpstreams.Notify(ctx, &structs.ServiceSpecificRequest{ |
|
Datacenter: s.source.Datacenter, |
|
QueryOptions: structs.QueryOptions{Token: s.token}, |
|
ServiceName: s.proxyCfg.DestinationServiceName, |
|
EnterpriseMeta: s.proxyID.EnterpriseMeta, |
|
}, intentionUpstreamsID, s.ch) |
|
if err != nil { |
|
return snap, err |
|
} |
|
} |
|
|
|
// Watch for updates to service endpoints for all upstreams |
|
for i := range s.proxyCfg.Upstreams { |
|
u := s.proxyCfg.Upstreams[i] |
|
|
|
uid := NewUpstreamID(&u) |
|
|
|
// Store defaults keyed under wildcard so they can be applied to centrally configured upstreams |
|
if u.DestinationName == structs.WildcardSpecifier { |
|
snap.ConnectProxy.UpstreamConfig[uid] = &u |
|
continue |
|
} |
|
|
|
snap.ConnectProxy.UpstreamConfig[uid] = &u |
|
// This can be true if the upstream is a synthetic entry populated from centralized upstream config. |
|
// Watches should not be created for them. |
|
if u.CentrallyConfigured { |
|
continue |
|
} |
|
|
|
dc := s.source.Datacenter |
|
if u.Datacenter != "" { |
|
dc = u.Datacenter |
|
} |
|
if s.proxyCfg.Mode == structs.ProxyModeTransparent && (dc == "" || dc == s.source.Datacenter) { |
|
// In transparent proxy mode, watches for upstreams in the local DC are handled by the IntentionUpstreams watch. |
|
continue |
|
} |
|
|
|
// Default the partition and namespace to the namespace of this proxy service. |
|
partition := s.proxyID.PartitionOrDefault() |
|
if u.DestinationPartition != "" { |
|
partition = u.DestinationPartition |
|
} |
|
ns := s.proxyID.NamespaceOrDefault() |
|
if u.DestinationNamespace != "" { |
|
ns = u.DestinationNamespace |
|
} |
|
|
|
cfg, err := parseReducedUpstreamConfig(u.Config) |
|
if err != nil { |
|
// Don't hard fail on a config typo, just warn. We'll fall back on |
|
// the plain discovery chain if there is an error so it's safe to |
|
// continue. |
|
s.logger.Warn("failed to parse upstream config", |
|
"upstream", uid.String(), |
|
"error", err, |
|
) |
|
} |
|
|
|
switch u.DestinationType { |
|
case structs.UpstreamDestTypePreparedQuery: |
|
err = s.dataSources.PreparedQuery.Notify(ctx, &structs.PreparedQueryExecuteRequest{ |
|
Datacenter: dc, |
|
QueryOptions: structs.QueryOptions{Token: s.token, MaxAge: defaultPreparedQueryPollInterval}, |
|
QueryIDOrName: u.DestinationName, |
|
Connect: true, |
|
Source: *s.source, |
|
}, "upstream:"+uid.String(), s.ch) |
|
if err != nil { |
|
return snap, err |
|
} |
|
|
|
case structs.UpstreamDestTypeService: |
|
fallthrough |
|
|
|
case "": |
|
// If DestinationPeer is not empty, insert a default discovery chain directly to the snapshot |
|
if u.DestinationPeer != "" { |
|
req := discoverychain.CompileRequest{ |
|
ServiceName: u.DestinationName, |
|
EvaluateInNamespace: u.DestinationNamespace, |
|
EvaluateInPartition: u.DestinationPartition, |
|
EvaluateInDatacenter: dc, |
|
EvaluateInTrustDomain: "trustdomain.consul", // TODO(peering): where to evaluate this? |
|
Entries: configentry.NewDiscoveryChainSet(), |
|
} |
|
chain, err := discoverychain.Compile(req) |
|
if err != nil { |
|
return snap, fmt.Errorf("error while compiling default discovery chain: %w", err) |
|
} |
|
|
|
// Directly insert chain and empty function into the discovery chain maps |
|
snap.ConnectProxy.ConfigSnapshotUpstreams.DiscoveryChain[uid] = chain |
|
snap.ConnectProxy.ConfigSnapshotUpstreams.WatchedDiscoveryChains[uid] = func() {} |
|
|
|
if err := (*handlerUpstreams)(s).resetWatchesFromChain(ctx, uid, chain, &snap.ConnectProxy.ConfigSnapshotUpstreams); err != nil { |
|
return snap, fmt.Errorf("error while resetting watches from chain: %w", err) |
|
} |
|
|
|
// Check whether a watch for this peer exists to avoid duplicates. |
|
if _, ok := snap.ConnectProxy.WatchedPeerTrustBundles[uid.Peer]; !ok { |
|
peerCtx, cancel := context.WithCancel(ctx) |
|
if err := s.dataSources.TrustBundle.Notify(peerCtx, &pbpeering.TrustBundleReadRequest{ |
|
Name: uid.Peer, |
|
Partition: uid.PartitionOrDefault(), |
|
}, peerTrustBundleIDPrefix+uid.Peer, s.ch); err != nil { |
|
cancel() |
|
return snap, fmt.Errorf("error while watching trust bundle for peer %q: %w", uid.Peer, err) |
|
} |
|
|
|
snap.ConnectProxy.WatchedPeerTrustBundles[uid.Peer] = cancel |
|
} |
|
continue |
|
} |
|
|
|
err = s.dataSources.CompiledDiscoveryChain.Notify(ctx, &structs.DiscoveryChainRequest{ |
|
Datacenter: s.source.Datacenter, |
|
QueryOptions: structs.QueryOptions{Token: s.token}, |
|
Name: u.DestinationName, |
|
EvaluateInDatacenter: dc, |
|
EvaluateInNamespace: ns, |
|
EvaluateInPartition: partition, |
|
OverrideMeshGateway: s.proxyCfg.MeshGateway.OverlayWith(u.MeshGateway), |
|
OverrideProtocol: cfg.Protocol, |
|
OverrideConnectTimeout: cfg.ConnectTimeout(), |
|
}, "discovery-chain:"+uid.String(), s.ch) |
|
if err != nil { |
|
return snap, fmt.Errorf("failed to watch discovery chain for %s: %v", uid.String(), err) |
|
} |
|
|
|
default: |
|
return snap, fmt.Errorf("unknown upstream type: %q", u.DestinationType) |
|
} |
|
} |
|
|
|
return snap, nil |
|
} |
|
|
|
func (s *handlerConnectProxy) handleUpdate(ctx context.Context, u UpdateEvent, snap *ConfigSnapshot) error { |
|
if u.Err != nil { |
|
return fmt.Errorf("error filling agent cache: %v", u.Err) |
|
} |
|
|
|
switch { |
|
case u.CorrelationID == rootsWatchID: |
|
roots, ok := u.Result.(*structs.IndexedCARoots) |
|
if !ok { |
|
return fmt.Errorf("invalid type for response: %T", u.Result) |
|
} |
|
snap.Roots = roots |
|
|
|
case strings.HasPrefix(u.CorrelationID, peerTrustBundleIDPrefix): |
|
resp, ok := u.Result.(*pbpeering.TrustBundleReadResponse) |
|
if !ok { |
|
return fmt.Errorf("invalid type for response: %T", u.Result) |
|
} |
|
peer := strings.TrimPrefix(u.CorrelationID, peerTrustBundleIDPrefix) |
|
if resp.Bundle != nil { |
|
snap.ConnectProxy.PeerTrustBundles[peer] = resp.Bundle |
|
} |
|
|
|
case u.CorrelationID == intentionsWatchID: |
|
resp, ok := u.Result.(*structs.IndexedIntentionMatches) |
|
if !ok { |
|
return fmt.Errorf("invalid type for response: %T", u.Result) |
|
} |
|
if len(resp.Matches) > 0 { |
|
// RPC supports matching multiple services at once but we only ever |
|
// query with the one service we represent currently so just pick |
|
// the one result set up. |
|
snap.ConnectProxy.Intentions = resp.Matches[0] |
|
} |
|
snap.ConnectProxy.IntentionsSet = true |
|
|
|
case u.CorrelationID == intentionUpstreamsID: |
|
resp, ok := u.Result.(*structs.IndexedServiceList) |
|
if !ok { |
|
return fmt.Errorf("invalid type for response %T", u.Result) |
|
} |
|
|
|
seenUpstreams := make(map[UpstreamID]struct{}) |
|
for _, svc := range resp.Services { |
|
uid := NewUpstreamIDFromServiceName(svc) |
|
|
|
seenUpstreams[uid] = struct{}{} |
|
|
|
cfgMap := make(map[string]interface{}) |
|
u, ok := snap.ConnectProxy.UpstreamConfig[uid] |
|
if ok { |
|
cfgMap = u.Config |
|
} else { |
|
// Use the centralized upstream defaults if they exist and there isn't specific configuration for this upstream |
|
// This is only relevant to upstreams from intentions because for explicit upstreams the defaulting is handled |
|
// by the ResolveServiceConfig endpoint. |
|
wildcardSID := structs.NewServiceID(structs.WildcardSpecifier, s.proxyID.WithWildcardNamespace()) |
|
wildcardUID := NewUpstreamIDFromServiceID(wildcardSID) |
|
defaults, ok := snap.ConnectProxy.UpstreamConfig[wildcardUID] |
|
if ok { |
|
u = defaults |
|
cfgMap = defaults.Config |
|
snap.ConnectProxy.UpstreamConfig[uid] = defaults |
|
} |
|
} |
|
|
|
cfg, err := parseReducedUpstreamConfig(cfgMap) |
|
if err != nil { |
|
// Don't hard fail on a config typo, just warn. We'll fall back on |
|
// the plain discovery chain if there is an error so it's safe to |
|
// continue. |
|
s.logger.Warn("failed to parse upstream config", |
|
"upstream", uid, |
|
"error", err, |
|
) |
|
} |
|
|
|
meshGateway := s.proxyCfg.MeshGateway |
|
if u != nil { |
|
meshGateway = meshGateway.OverlayWith(u.MeshGateway) |
|
} |
|
watchOpts := discoveryChainWatchOpts{ |
|
id: NewUpstreamIDFromServiceName(svc), |
|
name: svc.Name, |
|
namespace: svc.NamespaceOrDefault(), |
|
partition: svc.PartitionOrDefault(), |
|
datacenter: s.source.Datacenter, |
|
cfg: cfg, |
|
meshGateway: meshGateway, |
|
} |
|
up := &handlerUpstreams{handlerState: s.handlerState} |
|
err = up.watchDiscoveryChain(ctx, snap, watchOpts) |
|
if err != nil { |
|
return fmt.Errorf("failed to watch discovery chain for %s: %v", uid, err) |
|
} |
|
} |
|
snap.ConnectProxy.IntentionUpstreams = seenUpstreams |
|
|
|
// Clean up data from services that were not in the update |
|
for uid, targets := range snap.ConnectProxy.WatchedUpstreams { |
|
if upstream, ok := snap.ConnectProxy.UpstreamConfig[uid]; ok && upstream.Datacenter != "" && upstream.Datacenter != s.source.Datacenter { |
|
continue |
|
} |
|
if _, ok := seenUpstreams[uid]; !ok { |
|
for _, cancelFn := range targets { |
|
cancelFn() |
|
} |
|
delete(snap.ConnectProxy.WatchedUpstreams, uid) |
|
} |
|
} |
|
for uid := range snap.ConnectProxy.WatchedUpstreamEndpoints { |
|
if upstream, ok := snap.ConnectProxy.UpstreamConfig[uid]; ok && upstream.Datacenter != "" && upstream.Datacenter != s.source.Datacenter { |
|
continue |
|
} |
|
if _, ok := seenUpstreams[uid]; !ok { |
|
delete(snap.ConnectProxy.WatchedUpstreamEndpoints, uid) |
|
} |
|
} |
|
for uid, cancelMap := range snap.ConnectProxy.WatchedGateways { |
|
if upstream, ok := snap.ConnectProxy.UpstreamConfig[uid]; ok && upstream.Datacenter != "" && upstream.Datacenter != s.source.Datacenter { |
|
continue |
|
} |
|
if _, ok := seenUpstreams[uid]; !ok { |
|
for _, cancelFn := range cancelMap { |
|
cancelFn() |
|
} |
|
delete(snap.ConnectProxy.WatchedGateways, uid) |
|
} |
|
} |
|
for uid := range snap.ConnectProxy.WatchedGatewayEndpoints { |
|
if upstream, ok := snap.ConnectProxy.UpstreamConfig[uid]; ok && upstream.Datacenter != "" && upstream.Datacenter != s.source.Datacenter { |
|
continue |
|
} |
|
if _, ok := seenUpstreams[uid]; !ok { |
|
delete(snap.ConnectProxy.WatchedGatewayEndpoints, uid) |
|
} |
|
} |
|
for uid, cancelFn := range snap.ConnectProxy.WatchedDiscoveryChains { |
|
if upstream, ok := snap.ConnectProxy.UpstreamConfig[uid]; ok && upstream.Datacenter != "" && upstream.Datacenter != s.source.Datacenter { |
|
continue |
|
} |
|
if _, ok := seenUpstreams[uid]; !ok { |
|
cancelFn() |
|
delete(snap.ConnectProxy.WatchedDiscoveryChains, uid) |
|
} |
|
} |
|
for uid := range snap.ConnectProxy.PassthroughUpstreams { |
|
if _, ok := seenUpstreams[uid]; !ok { |
|
delete(snap.ConnectProxy.PassthroughUpstreams, uid) |
|
} |
|
} |
|
for addr, indexed := range snap.ConnectProxy.PassthroughIndices { |
|
if _, ok := seenUpstreams[indexed.upstreamID]; !ok { |
|
delete(snap.ConnectProxy.PassthroughIndices, addr) |
|
} |
|
} |
|
|
|
// These entries are intentionally handled separately from the WatchedDiscoveryChains above. |
|
// There have been situations where a discovery watch was cancelled, then fired. |
|
// That update event then re-populated the DiscoveryChain map entry, which wouldn't get cleaned up |
|
// since there was no known watch for it. |
|
for uid := range snap.ConnectProxy.DiscoveryChain { |
|
if upstream, ok := snap.ConnectProxy.UpstreamConfig[uid]; ok && upstream.Datacenter != "" && upstream.Datacenter != s.source.Datacenter { |
|
continue |
|
} |
|
if _, ok := seenUpstreams[uid]; !ok { |
|
delete(snap.ConnectProxy.DiscoveryChain, uid) |
|
} |
|
} |
|
|
|
case strings.HasPrefix(u.CorrelationID, "upstream:"+preparedQueryIDPrefix): |
|
resp, ok := u.Result.(*structs.PreparedQueryExecuteResponse) |
|
if !ok { |
|
return fmt.Errorf("invalid type for response: %T", u.Result) |
|
} |
|
pq := strings.TrimPrefix(u.CorrelationID, "upstream:") |
|
uid := UpstreamIDFromString(pq) |
|
snap.ConnectProxy.PreparedQueryEndpoints[uid] = resp.Nodes |
|
|
|
case strings.HasPrefix(u.CorrelationID, svcChecksWatchIDPrefix): |
|
resp, ok := u.Result.([]structs.CheckType) |
|
if !ok { |
|
return fmt.Errorf("invalid type for service checks response: %T, want: []structs.CheckType", u.Result) |
|
} |
|
svcID := structs.ServiceIDFromString(strings.TrimPrefix(u.CorrelationID, svcChecksWatchIDPrefix)) |
|
snap.ConnectProxy.WatchedServiceChecks[svcID] = resp |
|
|
|
default: |
|
return (*handlerUpstreams)(s).handleUpdateUpstreams(ctx, u, snap) |
|
} |
|
return nil |
|
}
|
|
|