mirror of https://github.com/hashicorp/consul
You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
463 lines
16 KiB
463 lines
16 KiB
package proxycfg |
|
|
|
import ( |
|
"context" |
|
"fmt" |
|
|
|
cachetype "github.com/hashicorp/consul/agent/cache-types" |
|
"github.com/hashicorp/consul/agent/proxycfg/internal/watch" |
|
"github.com/hashicorp/consul/agent/structs" |
|
"github.com/hashicorp/consul/proto/private/pbpeering" |
|
) |
|
|
|
var _ kindHandler = (*handlerAPIGateway)(nil) |
|
|
|
// handlerAPIGateway generates a new ConfigSnapshot in response to |
|
// changes related to an api-gateway. |
|
type handlerAPIGateway struct { |
|
handlerState |
|
} |
|
|
|
// initialize sets up the initial watches needed based on the api-gateway registration |
|
func (h *handlerAPIGateway) initialize(ctx context.Context) (ConfigSnapshot, error) { |
|
snap := newConfigSnapshotFromServiceInstance(h.serviceInstance, h.stateConfig) |
|
|
|
// Watch for root changes |
|
err := h.dataSources.CARoots.Notify(ctx, &structs.DCSpecificRequest{ |
|
Datacenter: h.source.Datacenter, |
|
QueryOptions: structs.QueryOptions{Token: h.token}, |
|
Source: *h.source, |
|
}, rootsWatchID, h.ch) |
|
if err != nil { |
|
return snap, err |
|
} |
|
|
|
// Get information about the entire service mesh. |
|
err = h.dataSources.ConfigEntry.Notify(ctx, &structs.ConfigEntryQuery{ |
|
Kind: structs.MeshConfig, |
|
Name: structs.MeshConfigMesh, |
|
Datacenter: h.source.Datacenter, |
|
QueryOptions: structs.QueryOptions{Token: h.token}, |
|
EnterpriseMeta: *structs.DefaultEnterpriseMetaInPartition(h.proxyID.PartitionOrDefault()), |
|
}, meshConfigEntryID, h.ch) |
|
if err != nil { |
|
return snap, err |
|
} |
|
|
|
// Watch the api-gateway's config entry |
|
err = h.subscribeToConfigEntry(ctx, structs.APIGateway, h.service, gatewayConfigWatchID) |
|
if err != nil { |
|
return snap, err |
|
} |
|
|
|
// Watch the bound-api-gateway's config entry |
|
err = h.subscribeToConfigEntry(ctx, structs.BoundAPIGateway, h.service, gatewayConfigWatchID) |
|
if err != nil { |
|
return snap, err |
|
} |
|
|
|
snap.APIGateway.Listeners = make(map[string]structs.APIGatewayListener) |
|
snap.APIGateway.BoundListeners = make(map[string]structs.BoundAPIGatewayListener) |
|
snap.APIGateway.HTTPRoutes = watch.NewMap[structs.ResourceReference, *structs.HTTPRouteConfigEntry]() |
|
snap.APIGateway.TCPRoutes = watch.NewMap[structs.ResourceReference, *structs.TCPRouteConfigEntry]() |
|
snap.APIGateway.Certificates = watch.NewMap[structs.ResourceReference, *structs.InlineCertificateConfigEntry]() |
|
|
|
snap.APIGateway.Upstreams = make(listenerRouteUpstreams) |
|
snap.APIGateway.UpstreamsSet = make(routeUpstreamSet) |
|
|
|
// These need to be initialized here but are set by handlerUpstreams |
|
snap.APIGateway.DiscoveryChain = make(map[UpstreamID]*structs.CompiledDiscoveryChain) |
|
snap.APIGateway.PeerUpstreamEndpoints = watch.NewMap[UpstreamID, structs.CheckServiceNodes]() |
|
snap.APIGateway.PeerUpstreamEndpointsUseHostnames = make(map[UpstreamID]struct{}) |
|
snap.APIGateway.UpstreamPeerTrustBundles = watch.NewMap[string, *pbpeering.PeeringTrustBundle]() |
|
snap.APIGateway.WatchedDiscoveryChains = make(map[UpstreamID]context.CancelFunc) |
|
snap.APIGateway.WatchedGateways = make(map[UpstreamID]map[string]context.CancelFunc) |
|
snap.APIGateway.WatchedGatewayEndpoints = make(map[UpstreamID]map[string]structs.CheckServiceNodes) |
|
snap.APIGateway.WatchedLocalGWEndpoints = watch.NewMap[string, structs.CheckServiceNodes]() |
|
snap.APIGateway.WatchedUpstreams = make(map[UpstreamID]map[string]context.CancelFunc) |
|
snap.APIGateway.WatchedUpstreamEndpoints = make(map[UpstreamID]map[string]structs.CheckServiceNodes) |
|
|
|
return snap, nil |
|
} |
|
|
|
func (h *handlerAPIGateway) subscribeToConfigEntry(ctx context.Context, kind, name, watchID string) error { |
|
return h.dataSources.ConfigEntry.Notify(ctx, &structs.ConfigEntryQuery{ |
|
Kind: kind, |
|
Name: name, |
|
Datacenter: h.source.Datacenter, |
|
QueryOptions: structs.QueryOptions{Token: h.token}, |
|
EnterpriseMeta: h.proxyID.EnterpriseMeta, |
|
}, watchID, h.ch) |
|
} |
|
|
|
// handleUpdate responds to changes in the api-gateway. In general, we want |
|
// to crawl the various resources related to or attached to the gateway and |
|
// collect the list of things need to generate xDS. This list of resources |
|
// includes the bound-api-gateway, http-routes, tcp-routes, and inline-certificates. |
|
func (h *handlerAPIGateway) handleUpdate(ctx context.Context, u UpdateEvent, snap *ConfigSnapshot) error { |
|
if u.Err != nil { |
|
return fmt.Errorf("error filling agent cache: %v", u.Err) |
|
} |
|
|
|
switch { |
|
case u.CorrelationID == rootsWatchID: |
|
// Handle change in the CA roots |
|
if err := h.handleRootCAUpdate(u, snap); err != nil { |
|
return err |
|
} |
|
case u.CorrelationID == gatewayConfigWatchID: |
|
// Handle change in the api-gateway or bound-api-gateway config entry |
|
if err := h.handleGatewayConfigUpdate(ctx, u, snap); err != nil { |
|
return err |
|
} |
|
case u.CorrelationID == inlineCertificateConfigWatchID: |
|
// Handle change in an attached inline-certificate config entry |
|
if err := h.handleInlineCertConfigUpdate(ctx, u, snap); err != nil { |
|
return err |
|
} |
|
case u.CorrelationID == routeConfigWatchID: |
|
// Handle change in an attached http-route or tcp-route config entry |
|
if err := h.handleRouteConfigUpdate(ctx, u, snap); err != nil { |
|
return err |
|
} |
|
default: |
|
return (*handlerUpstreams)(h).handleUpdateUpstreams(ctx, u, snap) |
|
} |
|
|
|
return nil |
|
} |
|
|
|
// handleRootCAUpdate responds to changes in the watched root CA for a gateway |
|
func (h *handlerAPIGateway) handleRootCAUpdate(u UpdateEvent, snap *ConfigSnapshot) error { |
|
roots, ok := u.Result.(*structs.IndexedCARoots) |
|
if !ok { |
|
return fmt.Errorf("invalid type for response: %T", u.Result) |
|
} |
|
snap.Roots = roots |
|
return nil |
|
} |
|
|
|
// handleGatewayConfigUpdate responds to changes in the watched config entry for a gateway. |
|
// In particular, we want to make sure that we're subscribing to any attached resources such |
|
// as routes and certificates. These additional subscriptions will enable us to update the |
|
// config snapshot appropriately for any route or certificate changes. |
|
func (h *handlerAPIGateway) handleGatewayConfigUpdate(ctx context.Context, u UpdateEvent, snap *ConfigSnapshot) error { |
|
resp, ok := u.Result.(*structs.ConfigEntryResponse) |
|
if !ok { |
|
return fmt.Errorf("invalid type for response: %T", u.Result) |
|
} else if resp.Entry == nil { |
|
return nil |
|
} |
|
|
|
switch gwConf := resp.Entry.(type) { |
|
case *structs.BoundAPIGatewayConfigEntry: |
|
snap.APIGateway.BoundGatewayConfig = gwConf |
|
|
|
seenRefs := make(map[structs.ResourceReference]any) |
|
for _, listener := range gwConf.Listeners { |
|
snap.APIGateway.BoundListeners[listener.Name] = listener |
|
|
|
// Subscribe to changes in each attached x-route config entry |
|
for _, ref := range listener.Routes { |
|
seenRefs[ref] = struct{}{} |
|
|
|
ctx, cancel := context.WithCancel(ctx) |
|
switch ref.Kind { |
|
case structs.HTTPRoute: |
|
snap.APIGateway.HTTPRoutes.InitWatch(ref, cancel) |
|
case structs.TCPRoute: |
|
snap.APIGateway.TCPRoutes.InitWatch(ref, cancel) |
|
default: |
|
cancel() |
|
return fmt.Errorf("unexpected route kind on gateway: %s", ref.Kind) |
|
} |
|
|
|
err := h.subscribeToConfigEntry(ctx, ref.Kind, ref.Name, routeConfigWatchID) |
|
if err != nil { |
|
// TODO May want to continue |
|
return err |
|
} |
|
} |
|
|
|
// Subscribe to changes in each attached inline-certificate config entry |
|
for _, ref := range listener.Certificates { |
|
ctx, cancel := context.WithCancel(ctx) |
|
seenRefs[ref] = struct{}{} |
|
snap.APIGateway.Certificates.InitWatch(ref, cancel) |
|
|
|
err := h.subscribeToConfigEntry(ctx, ref.Kind, ref.Name, inlineCertificateConfigWatchID) |
|
if err != nil { |
|
// TODO May want to continue |
|
return err |
|
} |
|
} |
|
} |
|
|
|
// Unsubscribe from any config entries that are no longer attached |
|
snap.APIGateway.HTTPRoutes.ForEachKey(func(ref structs.ResourceReference) bool { |
|
if _, ok := seenRefs[ref]; !ok { |
|
snap.APIGateway.Upstreams.delete(ref) |
|
snap.APIGateway.UpstreamsSet.delete(ref) |
|
snap.APIGateway.HTTPRoutes.CancelWatch(ref) |
|
} |
|
return true |
|
}) |
|
|
|
snap.APIGateway.TCPRoutes.ForEachKey(func(ref structs.ResourceReference) bool { |
|
if _, ok := seenRefs[ref]; !ok { |
|
snap.APIGateway.Upstreams.delete(ref) |
|
snap.APIGateway.UpstreamsSet.delete(ref) |
|
snap.APIGateway.TCPRoutes.CancelWatch(ref) |
|
} |
|
return true |
|
}) |
|
|
|
snap.APIGateway.Certificates.ForEachKey(func(ref structs.ResourceReference) bool { |
|
if _, ok := seenRefs[ref]; !ok { |
|
snap.APIGateway.Certificates.CancelWatch(ref) |
|
} |
|
return true |
|
}) |
|
|
|
snap.APIGateway.BoundGatewayConfigLoaded = true |
|
break |
|
case *structs.APIGatewayConfigEntry: |
|
snap.APIGateway.GatewayConfig = gwConf |
|
|
|
for _, listener := range gwConf.Listeners { |
|
snap.APIGateway.Listeners[listener.Name] = listener |
|
} |
|
|
|
snap.APIGateway.GatewayConfigLoaded = true |
|
break |
|
default: |
|
return fmt.Errorf("invalid type for config entry: %T", resp.Entry) |
|
} |
|
|
|
return h.watchIngressLeafCert(ctx, snap) |
|
} |
|
|
|
// handleInlineCertConfigUpdate stores the certificate for the gateway |
|
func (h *handlerAPIGateway) handleInlineCertConfigUpdate(_ context.Context, u UpdateEvent, snap *ConfigSnapshot) error { |
|
resp, ok := u.Result.(*structs.ConfigEntryResponse) |
|
if !ok { |
|
return fmt.Errorf("invalid type for response: %T", u.Result) |
|
} else if resp.Entry == nil { |
|
return nil |
|
} |
|
|
|
cfg, ok := resp.Entry.(*structs.InlineCertificateConfigEntry) |
|
if !ok { |
|
return fmt.Errorf("invalid type for config entry: %T", resp.Entry) |
|
} |
|
|
|
ref := structs.ResourceReference{ |
|
Kind: cfg.GetKind(), |
|
Name: cfg.GetName(), |
|
EnterpriseMeta: *cfg.GetEnterpriseMeta(), |
|
} |
|
|
|
snap.APIGateway.Certificates.Set(ref, cfg) |
|
|
|
return nil |
|
} |
|
|
|
// handleRouteConfigUpdate builds the list of upstreams for services on |
|
// the route and watches the related discovery chains. |
|
func (h *handlerAPIGateway) handleRouteConfigUpdate(ctx context.Context, u UpdateEvent, snap *ConfigSnapshot) error { |
|
resp, ok := u.Result.(*structs.ConfigEntryResponse) |
|
if !ok { |
|
return fmt.Errorf("invalid type for response: %T", u.Result) |
|
} else if resp.Entry == nil { |
|
return nil |
|
} |
|
|
|
ref := structs.ResourceReference{ |
|
Kind: resp.Entry.GetKind(), |
|
Name: resp.Entry.GetName(), |
|
EnterpriseMeta: *resp.Entry.GetEnterpriseMeta(), |
|
} |
|
|
|
seenUpstreamIDs := make(upstreamIDSet) |
|
upstreams := make(map[APIGatewayListenerKey]structs.Upstreams) |
|
|
|
switch route := resp.Entry.(type) { |
|
case *structs.HTTPRouteConfigEntry: |
|
snap.APIGateway.HTTPRoutes.Set(ref, route) |
|
|
|
for _, rule := range route.Rules { |
|
for _, service := range rule.Services { |
|
for _, listener := range snap.APIGateway.Listeners { |
|
shouldBind := false |
|
for _, parent := range route.Parents { |
|
if h.referenceIsForListener(parent, listener, snap) { |
|
shouldBind = true |
|
break |
|
} |
|
} |
|
if !shouldBind { |
|
continue |
|
} |
|
|
|
upstream := structs.Upstream{ |
|
DestinationName: service.Name, |
|
DestinationNamespace: service.NamespaceOrDefault(), |
|
DestinationPartition: service.PartitionOrDefault(), |
|
LocalBindPort: listener.Port, |
|
// TODO IngressHosts: g.Hosts, |
|
// Pass the protocol that was configured on the listener in order |
|
// to force that protocol on the Envoy listener. |
|
Config: map[string]interface{}{ |
|
"protocol": "http", |
|
}, |
|
} |
|
|
|
listenerKey := APIGatewayListenerKey{Protocol: string(listener.Protocol), Port: listener.Port} |
|
upstreams[listenerKey] = append(upstreams[listenerKey], upstream) |
|
} |
|
|
|
upstreamID := NewUpstreamIDFromServiceName(service.ServiceName()) |
|
seenUpstreamIDs[upstreamID] = struct{}{} |
|
|
|
watchOpts := discoveryChainWatchOpts{ |
|
id: upstreamID, |
|
name: service.Name, |
|
namespace: service.NamespaceOrDefault(), |
|
partition: service.PartitionOrDefault(), |
|
datacenter: h.stateConfig.source.Datacenter, |
|
} |
|
|
|
handler := &handlerUpstreams{handlerState: h.handlerState} |
|
if err := handler.watchDiscoveryChain(ctx, snap, watchOpts); err != nil { |
|
return fmt.Errorf("failed to watch discovery chain for %s: %w", upstreamID, err) |
|
} |
|
} |
|
} |
|
|
|
case *structs.TCPRouteConfigEntry: |
|
snap.APIGateway.TCPRoutes.Set(ref, route) |
|
|
|
for _, service := range route.Services { |
|
upstreamID := NewUpstreamIDFromServiceName(service.ServiceName()) |
|
seenUpstreamIDs.add(upstreamID) |
|
|
|
// For each listener, check if this route should bind and, if so, create an upstream. |
|
for _, listener := range snap.APIGateway.Listeners { |
|
shouldBind := false |
|
for _, parent := range route.Parents { |
|
if h.referenceIsForListener(parent, listener, snap) { |
|
shouldBind = true |
|
break |
|
} |
|
} |
|
if !shouldBind { |
|
continue |
|
} |
|
|
|
upstream := structs.Upstream{ |
|
DestinationName: service.Name, |
|
DestinationNamespace: service.NamespaceOrDefault(), |
|
DestinationPartition: service.PartitionOrDefault(), |
|
LocalBindPort: listener.Port, |
|
// Pass the protocol that was configured on the ingress listener in order |
|
// to force that protocol on the Envoy listener. |
|
Config: map[string]interface{}{ |
|
"protocol": "tcp", |
|
}, |
|
} |
|
|
|
listenerKey := APIGatewayListenerKey{Protocol: string(listener.Protocol), Port: listener.Port} |
|
upstreams[listenerKey] = append(upstreams[listenerKey], upstream) |
|
} |
|
|
|
watchOpts := discoveryChainWatchOpts{ |
|
id: upstreamID, |
|
name: service.Name, |
|
namespace: service.NamespaceOrDefault(), |
|
partition: service.PartitionOrDefault(), |
|
datacenter: h.stateConfig.source.Datacenter, |
|
} |
|
|
|
handler := &handlerUpstreams{handlerState: h.handlerState} |
|
if err := handler.watchDiscoveryChain(ctx, snap, watchOpts); err != nil { |
|
return fmt.Errorf("failed to watch discovery chain for %s: %w", upstreamID, err) |
|
} |
|
} |
|
default: |
|
return fmt.Errorf("invalid type for config entry: %T", resp.Entry) |
|
} |
|
|
|
for listener, set := range upstreams { |
|
snap.APIGateway.Upstreams.set(ref, listener, set) |
|
} |
|
snap.APIGateway.UpstreamsSet.set(ref, seenUpstreamIDs) |
|
//snap.APIGateway.Hosts = TODO |
|
snap.APIGateway.AreHostsSet = true |
|
|
|
// Stop watching any upstreams and discovery chains that have become irrelevant |
|
for upstreamID, cancelDiscoChain := range snap.APIGateway.WatchedDiscoveryChains { |
|
if snap.APIGateway.UpstreamsSet.hasUpstream(upstreamID) { |
|
continue |
|
} |
|
|
|
for targetID, cancelUpstream := range snap.APIGateway.WatchedUpstreams[upstreamID] { |
|
cancelUpstream() |
|
delete(snap.APIGateway.WatchedUpstreams[upstreamID], targetID) |
|
delete(snap.APIGateway.WatchedUpstreamEndpoints[upstreamID], targetID) |
|
|
|
if targetUID := NewUpstreamIDFromTargetID(targetID); targetUID.Peer != "" { |
|
snap.APIGateway.PeerUpstreamEndpoints.CancelWatch(targetUID) |
|
snap.APIGateway.UpstreamPeerTrustBundles.CancelWatch(targetUID.Peer) |
|
} |
|
} |
|
|
|
cancelDiscoChain() |
|
delete(snap.APIGateway.WatchedDiscoveryChains, upstreamID) |
|
} |
|
|
|
return nil |
|
} |
|
|
|
// referenceIsForListener returns whether the provided structs.ResourceReference |
|
// targets the provided structs.APIGatewayListener. For this to be true, the kind |
|
// and name must match the structs.APIGatewayConfigEntry containing the listener, |
|
// and the reference must specify either no section name or the name of the listener |
|
// as the section name. |
|
// |
|
// TODO This would probably be more generally useful as a helper in the structs pkg |
|
func (h *handlerAPIGateway) referenceIsForListener(ref structs.ResourceReference, listener structs.APIGatewayListener, snap *ConfigSnapshot) bool { |
|
if ref.Kind != structs.APIGateway && ref.Kind != "" { |
|
return false |
|
} |
|
if ref.Name != snap.APIGateway.GatewayConfig.Name { |
|
return false |
|
} |
|
return ref.SectionName == "" || ref.SectionName == listener.Name |
|
} |
|
|
|
func (h *handlerAPIGateway) watchIngressLeafCert(ctx context.Context, snap *ConfigSnapshot) error { |
|
// Note that we DON'T test for TLS.enabled because we need a leaf cert for the |
|
// gateway even without TLS to use as a client cert. |
|
if !snap.APIGateway.GatewayConfigLoaded { |
|
return nil |
|
} |
|
|
|
// Watch the leaf cert |
|
if snap.APIGateway.LeafCertWatchCancel != nil { |
|
snap.APIGateway.LeafCertWatchCancel() |
|
} |
|
ctx, cancel := context.WithCancel(ctx) |
|
err := h.dataSources.LeafCertificate.Notify(ctx, &cachetype.ConnectCALeafRequest{ |
|
Datacenter: h.source.Datacenter, |
|
Token: h.token, |
|
Service: h.service, |
|
EnterpriseMeta: h.proxyID.EnterpriseMeta, |
|
}, leafWatchID, h.ch) |
|
if err != nil { |
|
cancel() |
|
return err |
|
} |
|
snap.APIGateway.LeafCertWatchCancel = cancel |
|
|
|
return nil |
|
}
|
|
|