You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
consul/agent/proxycfg/upstreams.go

617 lines
20 KiB

// Copyright (c) HashiCorp, Inc.
[COMPLIANCE] License changes (#18443) * Adding explicit MPL license for sub-package This directory and its subdirectories (packages) contain files licensed with the MPLv2 `LICENSE` file in this directory and are intentionally licensed separately from the BSL `LICENSE` file at the root of this repository. * Adding explicit MPL license for sub-package This directory and its subdirectories (packages) contain files licensed with the MPLv2 `LICENSE` file in this directory and are intentionally licensed separately from the BSL `LICENSE` file at the root of this repository. * Updating the license from MPL to Business Source License Going forward, this project will be licensed under the Business Source License v1.1. Please see our blog post for more details at <Blog URL>, FAQ at www.hashicorp.com/licensing-faq, and details of the license at www.hashicorp.com/bsl. * add missing license headers * Update copyright file headers to BUSL-1.1 * Update copyright file headers to BUSL-1.1 * Update copyright file headers to BUSL-1.1 * Update copyright file headers to BUSL-1.1 * Update copyright file headers to BUSL-1.1 * Update copyright file headers to BUSL-1.1 * Update copyright file headers to BUSL-1.1 * Update copyright file headers to BUSL-1.1 * Update copyright file headers to BUSL-1.1 * Update copyright file headers to BUSL-1.1 * Update copyright file headers to BUSL-1.1 * Update copyright file headers to BUSL-1.1 * Update copyright file headers to BUSL-1.1 * Update copyright file headers to BUSL-1.1 * Update copyright file headers to BUSL-1.1 --------- Co-authored-by: hashicorp-copywrite[bot] <110428419+hashicorp-copywrite[bot]@users.noreply.github.com>
1 year ago
// SPDX-License-Identifier: BUSL-1.1
package proxycfg
import (
"context"
"fmt"
"strings"
"time"
"github.com/mitchellh/mapstructure"
"github.com/hashicorp/consul/acl"
cachetype "github.com/hashicorp/consul/agent/cache-types"
"github.com/hashicorp/consul/agent/structs"
Protobuf Refactoring for Multi-Module Cleanliness (#16302) Protobuf Refactoring for Multi-Module Cleanliness This commit includes the following: Moves all packages that were within proto/ to proto/private Rewrites imports to account for the packages being moved Adds in buf.work.yaml to enable buf workspaces Names the proto-public buf module so that we can override the Go package imports within proto/buf.yaml Bumps the buf version dependency to 1.14.0 (I was trying out the version to see if it would get around an issue - it didn't but it also doesn't break things and it seemed best to keep up with the toolchain changes) Why: In the future we will need to consume other protobuf dependencies such as the Google HTTP annotations for openapi generation or grpc-gateway usage. There were some recent changes to have our own ratelimiting annotations. The two combined were not working when I was trying to use them together (attempting to rebase another branch) Buf workspaces should be the solution to the problem Buf workspaces means that each module will have generated Go code that embeds proto file names relative to the proto dir and not the top level repo root. This resulted in proto file name conflicts in the Go global protobuf type registry. The solution to that was to add in a private/ directory into the path within the proto/ directory. That then required rewriting all the imports. Is this safe? AFAICT yes The gRPC wire protocol doesn't seem to care about the proto file names (although the Go grpc code does tack on the proto file name as Metadata in the ServiceDesc) Other than imports, there were no changes to any generated code as a result of this.
2 years ago
"github.com/hashicorp/consul/proto/private/pbpeering"
)
type handlerUpstreams struct {
handlerState
}
func (s *handlerUpstreams) handleUpdateUpstreams(ctx context.Context, u UpdateEvent, snap *ConfigSnapshot) error {
if u.Err != nil {
return fmt.Errorf("error filling agent cache: %v", u.Err)
}
upstreamsSnapshot, err := snap.ToConfigSnapshotUpstreams()
if err != nil {
return err
}
switch {
case u.CorrelationID == leafWatchID:
leaf, ok := u.Result.(*structs.IssuedCert)
if !ok {
return fmt.Errorf("invalid type for response: %T", u.Result)
}
upstreamsSnapshot.Leaf = leaf
case u.CorrelationID == meshConfigEntryID:
resp, ok := u.Result.(*structs.ConfigEntryResponse)
if !ok {
return fmt.Errorf("invalid type for response: %T", u.Result)
}
if resp.Entry != nil {
meshConf, ok := resp.Entry.(*structs.MeshConfigEntry)
if !ok {
return fmt.Errorf("invalid type for config entry: %T", resp.Entry)
}
upstreamsSnapshot.MeshConfig = meshConf
} else {
upstreamsSnapshot.MeshConfig = nil
}
upstreamsSnapshot.MeshConfigSet = true
case strings.HasPrefix(u.CorrelationID, "discovery-chain:"):
resp, ok := u.Result.(*structs.DiscoveryChainResponse)
if !ok {
return fmt.Errorf("invalid type for response: %T", u.Result)
}
uidString := strings.TrimPrefix(u.CorrelationID, "discovery-chain:")
uid := UpstreamIDFromString(uidString)
switch snap.Kind {
Implement APIGateway proxycfg snapshot (#16194) * Stub proxycfg handler for API gateway * Add Service Kind constants/handling for API Gateway * Begin stubbing for SDS * Add new Secret type to xDS order of operations * Continue stubbing of SDS * Iterate on proxycfg handler for API gateway * Handle BoundAPIGateway config entry subscription in proxycfg-glue * Add API gateway to config snapshot validation * Add API gateway to config snapshot clone, leaf, etc. * Subscribe to bound route + cert config entries on bound-api-gateway * Track routes + certs on API gateway config snapshot * Generate DeepCopy() for types used in watch.Map * Watch all active references on api-gateway, unwatch inactive * Track loading of initial bound-api-gateway config entry * Use proper proto package for SDS mapping * Use ResourceReference instead of ServiceName, collect resources * Fix typo, add + remove TODOs * Watch discovery chains for TCPRoute * Add TODO for updating gateway services for api-gateway * make proto * Regenerate deep-copy for proxycfg * Set datacenter on upstream ID from query source * Watch discovery chains for http-route service backends * Add ServiceName getter to HTTP+TCP Service structs * Clean up unwatched discovery chains on API Gateway * Implement watch for ingress leaf certificate * Collect upstreams on http-route + tcp-route updates * Remove unused GatewayServices update handler * Remove unnecessary gateway services logic for API Gateway * Remove outdate TODO * Use .ToIngress where appropriate, including TODO for cleaning up * Cancel before returning error * Remove GatewayServices subscription * Add godoc for handlerAPIGateway functions * Update terminology from Connect => Consul Service Mesh Consistent with terminology changes in https://github.com/hashicorp/consul/pull/12690 * Add missing TODO * Remove duplicate switch case * Rerun deep-copy generator * Use correct property on config snapshot * Remove unnecessary leaf cert watch * Clean up based on code review feedback * Note handler properties that are initialized but set elsewhere * Add TODO for moving helper func into structs pkg * Update generated DeepCopy code * gofmt * Generate DeepCopy() for API gateway listener types * Improve variable name * Regenerate DeepCopy() code * Fix linting issue * Temporarily remove the secret type from resource generation
2 years ago
case structs.ServiceKindAPIGateway:
if !snap.APIGateway.UpstreamsSet.hasUpstream(uid) {
Implement APIGateway proxycfg snapshot (#16194) * Stub proxycfg handler for API gateway * Add Service Kind constants/handling for API Gateway * Begin stubbing for SDS * Add new Secret type to xDS order of operations * Continue stubbing of SDS * Iterate on proxycfg handler for API gateway * Handle BoundAPIGateway config entry subscription in proxycfg-glue * Add API gateway to config snapshot validation * Add API gateway to config snapshot clone, leaf, etc. * Subscribe to bound route + cert config entries on bound-api-gateway * Track routes + certs on API gateway config snapshot * Generate DeepCopy() for types used in watch.Map * Watch all active references on api-gateway, unwatch inactive * Track loading of initial bound-api-gateway config entry * Use proper proto package for SDS mapping * Use ResourceReference instead of ServiceName, collect resources * Fix typo, add + remove TODOs * Watch discovery chains for TCPRoute * Add TODO for updating gateway services for api-gateway * make proto * Regenerate deep-copy for proxycfg * Set datacenter on upstream ID from query source * Watch discovery chains for http-route service backends * Add ServiceName getter to HTTP+TCP Service structs * Clean up unwatched discovery chains on API Gateway * Implement watch for ingress leaf certificate * Collect upstreams on http-route + tcp-route updates * Remove unused GatewayServices update handler * Remove unnecessary gateway services logic for API Gateway * Remove outdate TODO * Use .ToIngress where appropriate, including TODO for cleaning up * Cancel before returning error * Remove GatewayServices subscription * Add godoc for handlerAPIGateway functions * Update terminology from Connect => Consul Service Mesh Consistent with terminology changes in https://github.com/hashicorp/consul/pull/12690 * Add missing TODO * Remove duplicate switch case * Rerun deep-copy generator * Use correct property on config snapshot * Remove unnecessary leaf cert watch * Clean up based on code review feedback * Note handler properties that are initialized but set elsewhere * Add TODO for moving helper func into structs pkg * Update generated DeepCopy code * gofmt * Generate DeepCopy() for API gateway listener types * Improve variable name * Regenerate DeepCopy() code * Fix linting issue * Temporarily remove the secret type from resource generation
2 years ago
// Discovery chain is not associated with a known explicit or implicit upstream so it is purged/skipped.
// The associated watch was likely cancelled.
delete(upstreamsSnapshot.DiscoveryChain, uid)
s.logger.Trace("discovery-chain watch fired for unknown upstream", "upstream", uid)
return nil
}
case structs.ServiceKindIngressGateway:
if _, ok := snap.IngressGateway.UpstreamsSet[uid]; !ok {
// Discovery chain is not associated with a known explicit or implicit upstream so it is purged/skipped.
// The associated watch was likely cancelled.
delete(upstreamsSnapshot.DiscoveryChain, uid)
s.logger.Trace("discovery-chain watch fired for unknown upstream", "upstream", uid)
return nil
}
case structs.ServiceKindConnectProxy:
explicit := snap.ConnectProxy.UpstreamConfig[uid].HasLocalPortOrSocket()
implicit := snap.ConnectProxy.IsImplicitUpstream(uid)
if !implicit && !explicit {
// Discovery chain is not associated with a known explicit or implicit upstream so it is purged/skipped.
// The associated watch was likely cancelled.
delete(upstreamsSnapshot.DiscoveryChain, uid)
s.logger.Trace("discovery-chain watch fired for unknown upstream", "upstream", uid)
return nil
}
default:
return fmt.Errorf("discovery-chain watch fired for unsupported kind: %s", snap.Kind)
}
upstreamsSnapshot.DiscoveryChain[uid] = resp.Chain
if err := s.resetWatchesFromChain(ctx, uid, resp.Chain, upstreamsSnapshot); err != nil {
return err
}
case strings.HasPrefix(u.CorrelationID, upstreamPeerWatchIDPrefix):
resp, ok := u.Result.(*structs.IndexedCheckServiceNodes)
if !ok {
return fmt.Errorf("invalid type for response: %T", u.Result)
}
uidString := strings.TrimPrefix(u.CorrelationID, upstreamPeerWatchIDPrefix)
uid := UpstreamIDFromString(uidString)
s.setPeerEndpoints(upstreamsSnapshot, uid, resp.Nodes)
case strings.HasPrefix(u.CorrelationID, peerTrustBundleIDPrefix):
resp, ok := u.Result.(*pbpeering.TrustBundleReadResponse)
if !ok {
return fmt.Errorf("invalid type for response: %T", u.Result)
}
peer := strings.TrimPrefix(u.CorrelationID, peerTrustBundleIDPrefix)
if resp.Bundle != nil {
upstreamsSnapshot.UpstreamPeerTrustBundles.Set(peer, resp.Bundle)
}
case strings.HasPrefix(u.CorrelationID, "upstream-target:"):
resp, ok := u.Result.(*structs.IndexedCheckServiceNodes)
if !ok {
return fmt.Errorf("invalid type for response: %T", u.Result)
}
correlationID := strings.TrimPrefix(u.CorrelationID, "upstream-target:")
targetID, uidString, ok := removeColonPrefix(correlationID)
if !ok {
return fmt.Errorf("invalid correlation id %q", u.CorrelationID)
}
uid := UpstreamIDFromString(uidString)
s.logger.Debug("upstream-target watch fired",
"correlationID", correlationID,
"nodes", len(resp.Nodes),
)
if _, ok := upstreamsSnapshot.WatchedUpstreamEndpoints[uid]; !ok {
upstreamsSnapshot.WatchedUpstreamEndpoints[uid] = make(map[string]structs.CheckServiceNodes)
}
upstreamsSnapshot.WatchedUpstreamEndpoints[uid][targetID] = resp.Nodes
// Skip adding passthroughs unless it's a connect sidecar in tproxy mode.
if s.kind != structs.ServiceKindConnectProxy || s.proxyCfg.Mode != structs.ProxyModeTransparent {
return nil
}
// Clear out this target's existing passthrough upstreams and indices so that they can be repopulated below.
if _, ok := upstreamsSnapshot.PassthroughUpstreams[uid]; ok {
for addr := range upstreamsSnapshot.PassthroughUpstreams[uid][targetID] {
if indexed := upstreamsSnapshot.PassthroughIndices[addr]; indexed.targetID == targetID && indexed.upstreamID == uid {
delete(upstreamsSnapshot.PassthroughIndices, addr)
}
}
upstreamsSnapshot.PassthroughUpstreams[uid][targetID] = make(map[string]struct{})
}
passthroughs := make(map[string]struct{})
for _, node := range resp.Nodes {
dialedDirectly := node.Service.Proxy.TransparentProxy.DialedDirectly
// We must do a manual merge here on the DialedDirectly field, because the service-defaults
// and proxy-defaults are not automatically merged into the CheckServiceNodes when in
// agentless mode (because the streaming backend doesn't yet support the MergeCentralConfig field).
if chain := snap.ConnectProxy.DiscoveryChain[uid]; chain != nil {
if target := chain.Targets[targetID]; target != nil {
dialedDirectly = dialedDirectly || target.TransparentProxy.DialedDirectly
}
}
// Skip adding a passthrough for the upstream node if not DialedDirectly.
if !dialedDirectly {
continue
}
// Make sure to use an external address when crossing partition or DC boundaries.
isRemote := !snap.Locality.Matches(node.Node.Datacenter, node.Node.PartitionOrDefault())
// If node is peered it must be remote
if node.Node.PeerOrEmpty() != "" {
isRemote = true
}
csnIdx, addr, _ := node.BestAddress(isRemote)
existing := upstreamsSnapshot.PassthroughIndices[addr]
if existing.idx > csnIdx {
// The last known instance with this address had a higher index so it takes precedence.
continue
}
// The current instance has a higher Raft index so we ensure the passthrough address is only
// associated with this upstream target. Older associations are cleaned up as needed.
delete(upstreamsSnapshot.PassthroughUpstreams[existing.upstreamID][existing.targetID], addr)
if len(upstreamsSnapshot.PassthroughUpstreams[existing.upstreamID][existing.targetID]) == 0 {
delete(upstreamsSnapshot.PassthroughUpstreams[existing.upstreamID], existing.targetID)
}
if len(upstreamsSnapshot.PassthroughUpstreams[existing.upstreamID]) == 0 {
delete(upstreamsSnapshot.PassthroughUpstreams, existing.upstreamID)
}
upstreamsSnapshot.PassthroughIndices[addr] = indexedTarget{idx: csnIdx, upstreamID: uid, targetID: targetID}
passthroughs[addr] = struct{}{}
}
// Always clear out the existing target passthroughs list so that clusters are cleaned up
// correctly if no entries are populated.
upstreamsSnapshot.PassthroughUpstreams[uid] = make(map[string]map[string]struct{})
if len(passthroughs) > 0 {
// Add the passthroughs to the target if any were found.
upstreamsSnapshot.PassthroughUpstreams[uid][targetID] = passthroughs
}
case strings.HasPrefix(u.CorrelationID, "mesh-gateway:"):
resp, ok := u.Result.(*structs.IndexedCheckServiceNodes)
if !ok {
return fmt.Errorf("invalid type for response: %T", u.Result)
}
correlationID := strings.TrimPrefix(u.CorrelationID, "mesh-gateway:")
key, uidString, ok := strings.Cut(correlationID, ":")
if ok {
// correlationID formatted with an upstreamID
uid := UpstreamIDFromString(uidString)
if _, ok = upstreamsSnapshot.WatchedGatewayEndpoints[uid]; !ok {
upstreamsSnapshot.WatchedGatewayEndpoints[uid] = make(map[string]structs.CheckServiceNodes)
}
upstreamsSnapshot.WatchedGatewayEndpoints[uid][key] = resp.Nodes
} else {
// event was for local gateways only
upstreamsSnapshot.WatchedLocalGWEndpoints.Set(key, resp.Nodes)
}
default:
return fmt.Errorf("unknown correlation ID: %s", u.CorrelationID)
}
return nil
}
func removeColonPrefix(s string) (string, string, bool) {
idx := strings.Index(s, ":")
if idx == -1 {
return "", "", false
}
return s[0:idx], s[idx+1:], true
}
func (s *handlerUpstreams) setPeerEndpoints(upstreamsSnapshot *ConfigSnapshotUpstreams, uid UpstreamID, nodes structs.CheckServiceNodes) {
filteredNodes := hostnameEndpoints(
s.logger,
GatewayKey{ /*empty so it never matches*/ },
nodes,
)
if len(filteredNodes) > 0 {
if set := upstreamsSnapshot.PeerUpstreamEndpoints.Set(uid, filteredNodes); set {
upstreamsSnapshot.PeerUpstreamEndpointsUseHostnames[uid] = struct{}{}
}
} else {
if set := upstreamsSnapshot.PeerUpstreamEndpoints.Set(uid, nodes); set {
delete(upstreamsSnapshot.PeerUpstreamEndpointsUseHostnames, uid)
}
}
}
func (s *handlerUpstreams) resetWatchesFromChain(
ctx context.Context,
uid UpstreamID,
chain *structs.CompiledDiscoveryChain,
snap *ConfigSnapshotUpstreams,
) error {
s.logger.Trace("resetting watches for discovery chain", "id", uid)
if chain == nil {
return fmt.Errorf("not possible to arrive here with no discovery chain")
}
// Initialize relevant sub maps.
if _, ok := snap.WatchedUpstreams[uid]; !ok {
snap.WatchedUpstreams[uid] = make(map[string]context.CancelFunc)
}
if _, ok := snap.WatchedUpstreamEndpoints[uid]; !ok {
snap.WatchedUpstreamEndpoints[uid] = make(map[string]structs.CheckServiceNodes)
}
if _, ok := snap.WatchedGateways[uid]; !ok {
snap.WatchedGateways[uid] = make(map[string]context.CancelFunc)
}
if _, ok := snap.WatchedGatewayEndpoints[uid]; !ok {
snap.WatchedGatewayEndpoints[uid] = make(map[string]structs.CheckServiceNodes)
}
// We could invalidate this selectively based on a hash of the relevant
// resolver information, but for now just reset anything about this
// upstream when the chain changes in any way.
//
// TODO(rb): content hash based add/remove
for targetID, cancelFn := range snap.WatchedUpstreams[uid] {
s.logger.Trace("stopping watch of target",
"upstream", uid,
"chain", chain.ServiceName,
"target", targetID,
)
delete(snap.WatchedUpstreams[uid], targetID)
delete(snap.WatchedUpstreamEndpoints[uid], targetID)
cancelFn()
targetUID := NewUpstreamIDFromTargetID(targetID)
if targetUID.Peer != "" {
snap.PeerUpstreamEndpoints.CancelWatch(targetUID)
snap.UpstreamPeerTrustBundles.CancelWatch(targetUID.Peer)
}
}
var (
watchedChainEndpoints bool
needGateways = make(map[string]struct{})
)
chainID := chain.ID()
for _, target := range chain.Targets {
if target.ID == chainID {
watchedChainEndpoints = true
}
opts := targetWatchOpts{
upstreamID: uid,
chainID: target.ID,
service: target.Service,
filter: target.Subset.Filter,
datacenter: target.Datacenter,
peer: target.Peer,
entMeta: target.GetEnterpriseMetadata(),
}
// Peering targets do not set the datacenter field, so we should default it here.
if opts.datacenter == "" {
opts.datacenter = s.source.Datacenter
}
err := s.watchUpstreamTarget(ctx, snap, opts)
if err != nil {
return fmt.Errorf("failed to watch target %q for upstream %q", target.ID, uid)
}
// We'll get endpoints from the gateway query, but the health still has
// to come from the backing service query.
var gk GatewayKey
switch target.MeshGateway.Mode {
case structs.MeshGatewayModeRemote:
gk = GatewayKey{
Partition: target.Partition,
Datacenter: target.Datacenter,
}
case structs.MeshGatewayModeLocal:
gk = GatewayKey{
Partition: s.proxyID.PartitionOrDefault(),
Datacenter: s.source.Datacenter,
}
}
if s.source.Datacenter != target.Datacenter || s.proxyID.PartitionOrDefault() != target.Partition {
needGateways[gk.String()] = struct{}{}
}
// Register a local gateway watch if any targets are pointing to a peer and require a mode of local.
if target.Peer != "" && target.MeshGateway.Mode == structs.MeshGatewayModeLocal {
s.setupWatchForLocalGWEndpoints(ctx, snap)
}
}
// If the discovery chain's targets do not lead to watching all endpoints
// for the upstream, then create a separate watch for those too.
// This is needed in transparent mode because if there is some service A that
// redirects to service B, the dialing proxy needs to associate A's virtual IP
// with A's discovery chain.
//
// Outside of transparent mode we only watch the chain target, B,
// since A is a virtual service and traffic will not be sent to it.
if !watchedChainEndpoints && s.proxyCfg.Mode == structs.ProxyModeTransparent {
chainEntMeta := acl.NewEnterpriseMetaWithPartition(chain.Partition, chain.Namespace)
opts := targetWatchOpts{
upstreamID: uid,
chainID: chainID,
service: chain.ServiceName,
filter: "",
datacenter: chain.Datacenter,
entMeta: &chainEntMeta,
}
err := s.watchUpstreamTarget(ctx, snap, opts)
if err != nil {
return fmt.Errorf("failed to watch target %q for upstream %q", chainID, uid)
}
}
for key := range needGateways {
if _, ok := snap.WatchedGateways[uid][key]; ok {
continue
}
gwKey := gatewayKeyFromString(key)
s.logger.Trace("initializing watch of mesh gateway",
"upstream", uid,
"chain", chain.ServiceName,
"datacenter", gwKey.Datacenter,
"partition", gwKey.Partition,
)
ctx, cancel := context.WithCancel(ctx)
opts := gatewayWatchOpts{
internalServiceDump: s.dataSources.InternalServiceDump,
notifyCh: s.ch,
source: *s.source,
token: s.token,
key: gwKey,
upstreamID: uid,
}
err := watchMeshGateway(ctx, opts)
if err != nil {
cancel()
return err
}
snap.WatchedGateways[uid][key] = cancel
}
for key, cancelFn := range snap.WatchedGateways[uid] {
if _, ok := needGateways[key]; ok {
continue
}
gwKey := gatewayKeyFromString(key)
s.logger.Trace("stopping watch of mesh gateway",
"upstream", uid,
"chain", chain.ServiceName,
"datacenter", gwKey.Datacenter,
"partition", gwKey.Partition,
)
delete(snap.WatchedGateways[uid], key)
delete(snap.WatchedGatewayEndpoints[uid], key)
cancelFn()
}
return nil
}
type targetWatchOpts struct {
upstreamID UpstreamID
chainID string
service string
filter string
datacenter string
peer string
entMeta *acl.EnterpriseMeta
}
func (s *handlerUpstreams) watchUpstreamTarget(ctx context.Context, snap *ConfigSnapshotUpstreams, opts targetWatchOpts) error {
s.logger.Trace("initializing watch of target",
"upstream", opts.upstreamID,
"chain", opts.service,
"target", opts.chainID,
)
uid := opts.upstreamID
correlationID := "upstream-target:" + opts.chainID + ":" + uid.String()
if opts.peer != "" {
uid = NewUpstreamIDFromTargetID(opts.chainID)
correlationID = upstreamPeerWatchIDPrefix + uid.String()
}
// Perform this merge so that a nil EntMeta isn't possible.
var entMeta acl.EnterpriseMeta
entMeta.Merge(opts.entMeta)
ctx, cancel := context.WithCancel(ctx)
err := s.dataSources.Health.Notify(ctx, &structs.ServiceSpecificRequest{
PeerName: opts.peer,
Datacenter: opts.datacenter,
QueryOptions: structs.QueryOptions{
Token: s.token,
Filter: opts.filter,
},
ServiceName: opts.service,
Connect: true,
// Note that Identifier doesn't type-prefix for service any more as it's
// the default and makes metrics and other things much cleaner. It's
// simpler for us if we have the type to make things unambiguous.
Source: *s.source,
EnterpriseMeta: entMeta,
}, correlationID, s.ch)
if err != nil {
cancel()
return err
}
snap.WatchedUpstreams[opts.upstreamID][opts.chainID] = cancel
if uid.Peer == "" {
return nil
}
if ok := snap.PeerUpstreamEndpoints.IsWatched(uid); !ok {
snap.PeerUpstreamEndpoints.InitWatch(uid, cancel)
}
// Check whether a watch for this peer exists to avoid duplicates.
if ok := snap.UpstreamPeerTrustBundles.IsWatched(uid.Peer); !ok {
peerCtx, cancel := context.WithCancel(ctx)
if err := s.dataSources.TrustBundle.Notify(peerCtx, &cachetype.TrustBundleReadRequest{
Request: &pbpeering.TrustBundleReadRequest{
Name: uid.Peer,
Partition: uid.PartitionOrDefault(),
},
QueryOptions: structs.QueryOptions{Token: s.token},
}, peerTrustBundleIDPrefix+uid.Peer, s.ch); err != nil {
cancel()
return fmt.Errorf("error while watching trust bundle for peer %q: %w", uid.Peer, err)
}
snap.UpstreamPeerTrustBundles.InitWatch(uid.Peer, cancel)
}
return nil
}
type discoveryChainWatchOpts struct {
id UpstreamID
name string
namespace string
partition string
datacenter string
cfg reducedUpstreamConfig
meshGateway structs.MeshGatewayConfig
}
func (s *handlerUpstreams) watchDiscoveryChain(ctx context.Context, snap *ConfigSnapshot, opts discoveryChainWatchOpts) error {
var watchedDiscoveryChains map[UpstreamID]context.CancelFunc
switch s.kind {
Implement APIGateway proxycfg snapshot (#16194) * Stub proxycfg handler for API gateway * Add Service Kind constants/handling for API Gateway * Begin stubbing for SDS * Add new Secret type to xDS order of operations * Continue stubbing of SDS * Iterate on proxycfg handler for API gateway * Handle BoundAPIGateway config entry subscription in proxycfg-glue * Add API gateway to config snapshot validation * Add API gateway to config snapshot clone, leaf, etc. * Subscribe to bound route + cert config entries on bound-api-gateway * Track routes + certs on API gateway config snapshot * Generate DeepCopy() for types used in watch.Map * Watch all active references on api-gateway, unwatch inactive * Track loading of initial bound-api-gateway config entry * Use proper proto package for SDS mapping * Use ResourceReference instead of ServiceName, collect resources * Fix typo, add + remove TODOs * Watch discovery chains for TCPRoute * Add TODO for updating gateway services for api-gateway * make proto * Regenerate deep-copy for proxycfg * Set datacenter on upstream ID from query source * Watch discovery chains for http-route service backends * Add ServiceName getter to HTTP+TCP Service structs * Clean up unwatched discovery chains on API Gateway * Implement watch for ingress leaf certificate * Collect upstreams on http-route + tcp-route updates * Remove unused GatewayServices update handler * Remove unnecessary gateway services logic for API Gateway * Remove outdate TODO * Use .ToIngress where appropriate, including TODO for cleaning up * Cancel before returning error * Remove GatewayServices subscription * Add godoc for handlerAPIGateway functions * Update terminology from Connect => Consul Service Mesh Consistent with terminology changes in https://github.com/hashicorp/consul/pull/12690 * Add missing TODO * Remove duplicate switch case * Rerun deep-copy generator * Use correct property on config snapshot * Remove unnecessary leaf cert watch * Clean up based on code review feedback * Note handler properties that are initialized but set elsewhere * Add TODO for moving helper func into structs pkg * Update generated DeepCopy code * gofmt * Generate DeepCopy() for API gateway listener types * Improve variable name * Regenerate DeepCopy() code * Fix linting issue * Temporarily remove the secret type from resource generation
2 years ago
case structs.ServiceKindAPIGateway:
watchedDiscoveryChains = snap.APIGateway.WatchedDiscoveryChains
case structs.ServiceKindIngressGateway:
watchedDiscoveryChains = snap.IngressGateway.WatchedDiscoveryChains
case structs.ServiceKindConnectProxy:
watchedDiscoveryChains = snap.ConnectProxy.WatchedDiscoveryChains
default:
return fmt.Errorf("unsupported kind %s", s.kind)
}
if _, ok := watchedDiscoveryChains[opts.id]; ok {
return nil
}
ctx, cancel := context.WithCancel(ctx)
err := s.dataSources.CompiledDiscoveryChain.Notify(ctx, &structs.DiscoveryChainRequest{
Datacenter: s.source.Datacenter,
QueryOptions: structs.QueryOptions{Token: s.token},
Name: opts.name,
EvaluateInDatacenter: opts.datacenter,
EvaluateInNamespace: opts.namespace,
EvaluateInPartition: opts.partition,
OverrideProtocol: opts.cfg.Protocol,
OverrideConnectTimeout: opts.cfg.ConnectTimeout(),
OverrideMeshGateway: opts.meshGateway,
}, "discovery-chain:"+opts.id.String(), s.ch)
if err != nil {
cancel()
return err
}
watchedDiscoveryChains[opts.id] = cancel
return nil
}
// reducedUpstreamConfig represents the basic opaque config values that are now
// managed with the discovery chain but for backwards compatibility reasons
// should still affect how the proxy is configured.
//
// The full-blown config is agent/xds.UpstreamConfig
type reducedUpstreamConfig struct {
Protocol string `mapstructure:"protocol"`
ConnectTimeoutMs int `mapstructure:"connect_timeout_ms"`
}
func (c *reducedUpstreamConfig) ConnectTimeout() time.Duration {
return time.Duration(c.ConnectTimeoutMs) * time.Millisecond
}
func parseReducedUpstreamConfig(m map[string]interface{}) (reducedUpstreamConfig, error) {
var cfg reducedUpstreamConfig
err := mapstructure.WeakDecode(m, &cfg)
return cfg, err
}
func (s *handlerUpstreams) setupWatchForLocalGWEndpoints(
ctx context.Context,
upstreams *ConfigSnapshotUpstreams,
) error {
gk := GatewayKey{
Partition: s.proxyID.PartitionOrDefault(),
Datacenter: s.source.Datacenter,
}
// If the watch is already initialized, do nothing.
if upstreams.WatchedLocalGWEndpoints.IsWatched(gk.String()) {
return nil
}
opts := gatewayWatchOpts{
internalServiceDump: s.dataSources.InternalServiceDump,
notifyCh: s.ch,
source: *s.source,
token: s.token,
key: gk,
}
if err := watchMeshGateway(ctx, opts); err != nil {
return fmt.Errorf("error while watching for local mesh gateway: %w", err)
}
upstreams.WatchedLocalGWEndpoints.InitWatch(gk.String(), nil)
return nil
}