mirror of https://github.com/hashicorp/consul
[API Gateway] Update simple test to leverage intentions and multiple listeners (#16228)
* [API Gateway] Add integration test for conflicted TCP listeners * [API Gateway] Update simple test to leverage intentions and multiple listeners * Fix broken unit test * PR suggestionspull/16236/head^2
parent
4c848a554d
commit
8ff2974dbe
|
@ -62,6 +62,9 @@ func (h *handlerAPIGateway) initialize(ctx context.Context) (ConfigSnapshot, err
|
|||
snap.APIGateway.TCPRoutes = watch.NewMap[structs.ResourceReference, *structs.TCPRouteConfigEntry]()
|
||||
snap.APIGateway.Certificates = watch.NewMap[structs.ResourceReference, *structs.InlineCertificateConfigEntry]()
|
||||
|
||||
snap.APIGateway.Upstreams = make(listenerRouteUpstreams)
|
||||
snap.APIGateway.UpstreamsSet = make(routeUpstreamSet)
|
||||
|
||||
// These need to be initialized here but are set by handlerUpstreams
|
||||
snap.APIGateway.DiscoveryChain = make(map[UpstreamID]*structs.CompiledDiscoveryChain)
|
||||
snap.APIGateway.PeerUpstreamEndpoints = watch.NewMap[UpstreamID, structs.CheckServiceNodes]()
|
||||
|
@ -192,6 +195,8 @@ func (h *handlerAPIGateway) handleGatewayConfigUpdate(ctx context.Context, u Upd
|
|||
// Unsubscribe from any config entries that are no longer attached
|
||||
snap.APIGateway.HTTPRoutes.ForEachKey(func(ref structs.ResourceReference) bool {
|
||||
if _, ok := seenRefs[ref]; !ok {
|
||||
snap.APIGateway.Upstreams.delete(ref)
|
||||
snap.APIGateway.UpstreamsSet.delete(ref)
|
||||
snap.APIGateway.HTTPRoutes.CancelWatch(ref)
|
||||
}
|
||||
return true
|
||||
|
@ -199,6 +204,8 @@ func (h *handlerAPIGateway) handleGatewayConfigUpdate(ctx context.Context, u Upd
|
|||
|
||||
snap.APIGateway.TCPRoutes.ForEachKey(func(ref structs.ResourceReference) bool {
|
||||
if _, ok := seenRefs[ref]; !ok {
|
||||
snap.APIGateway.Upstreams.delete(ref)
|
||||
snap.APIGateway.UpstreamsSet.delete(ref)
|
||||
snap.APIGateway.TCPRoutes.CancelWatch(ref)
|
||||
}
|
||||
return true
|
||||
|
@ -270,7 +277,7 @@ func (h *handlerAPIGateway) handleRouteConfigUpdate(ctx context.Context, u Updat
|
|||
EnterpriseMeta: *resp.Entry.GetEnterpriseMeta(),
|
||||
}
|
||||
|
||||
seenUpstreamIDs := make(map[UpstreamID]struct{})
|
||||
seenUpstreamIDs := make(upstreamIDSet)
|
||||
upstreams := make(map[APIGatewayListenerKey]structs.Upstreams)
|
||||
|
||||
switch route := resp.Entry.(type) {
|
||||
|
@ -331,7 +338,7 @@ func (h *handlerAPIGateway) handleRouteConfigUpdate(ctx context.Context, u Updat
|
|||
|
||||
for _, service := range route.Services {
|
||||
upstreamID := NewUpstreamIDFromServiceName(service.ServiceName())
|
||||
seenUpstreamIDs[upstreamID] = struct{}{}
|
||||
seenUpstreamIDs.add(upstreamID)
|
||||
|
||||
// For each listener, check if this route should bind and, if so, create an upstream.
|
||||
for _, listener := range snap.APIGateway.Listeners {
|
||||
|
@ -351,7 +358,6 @@ func (h *handlerAPIGateway) handleRouteConfigUpdate(ctx context.Context, u Updat
|
|||
DestinationNamespace: service.NamespaceOrDefault(),
|
||||
DestinationPartition: service.PartitionOrDefault(),
|
||||
LocalBindPort: listener.Port,
|
||||
//IngressHosts: g.Hosts,
|
||||
// Pass the protocol that was configured on the ingress listener in order
|
||||
// to force that protocol on the Envoy listener.
|
||||
Config: map[string]interface{}{
|
||||
|
@ -380,14 +386,16 @@ func (h *handlerAPIGateway) handleRouteConfigUpdate(ctx context.Context, u Updat
|
|||
return fmt.Errorf("invalid type for config entry: %T", resp.Entry)
|
||||
}
|
||||
|
||||
snap.APIGateway.Upstreams = upstreams
|
||||
snap.APIGateway.UpstreamsSet = seenUpstreamIDs
|
||||
for listener, set := range upstreams {
|
||||
snap.APIGateway.Upstreams.set(ref, listener, set)
|
||||
}
|
||||
snap.APIGateway.UpstreamsSet.set(ref, seenUpstreamIDs)
|
||||
//snap.APIGateway.Hosts = TODO
|
||||
snap.APIGateway.AreHostsSet = true
|
||||
|
||||
// Stop watching any upstreams and discovery chains that have become irrelevant
|
||||
for upstreamID, cancelDiscoChain := range snap.APIGateway.WatchedDiscoveryChains {
|
||||
if _, ok := seenUpstreamIDs[upstreamID]; ok {
|
||||
if snap.APIGateway.UpstreamsSet.hasUpstream(upstreamID) {
|
||||
continue
|
||||
}
|
||||
|
||||
|
|
|
@ -260,26 +260,40 @@ func (o *configSnapshotAPIGateway) DeepCopy() *configSnapshotAPIGateway {
|
|||
copy(cp.Hosts, o.Hosts)
|
||||
}
|
||||
if o.Upstreams != nil {
|
||||
cp.Upstreams = make(map[IngressListenerKey]structs.Upstreams, len(o.Upstreams))
|
||||
cp.Upstreams = make(map[structs.ResourceReference]listenerUpstreamMap, len(o.Upstreams))
|
||||
for k2, v2 := range o.Upstreams {
|
||||
var cp_Upstreams_v2 structs.Upstreams
|
||||
var cp_Upstreams_v2 listenerUpstreamMap
|
||||
if v2 != nil {
|
||||
cp_Upstreams_v2 = make([]structs.Upstream, len(v2))
|
||||
copy(cp_Upstreams_v2, v2)
|
||||
for i3 := range v2 {
|
||||
{
|
||||
retV := v2[i3].DeepCopy()
|
||||
cp_Upstreams_v2[i3] = *retV
|
||||
cp_Upstreams_v2 = make(map[IngressListenerKey]structs.Upstreams, len(v2))
|
||||
for k3, v3 := range v2 {
|
||||
var cp_Upstreams_v2_v3 structs.Upstreams
|
||||
if v3 != nil {
|
||||
cp_Upstreams_v2_v3 = make([]structs.Upstream, len(v3))
|
||||
copy(cp_Upstreams_v2_v3, v3)
|
||||
for i4 := range v3 {
|
||||
{
|
||||
retV := v3[i4].DeepCopy()
|
||||
cp_Upstreams_v2_v3[i4] = *retV
|
||||
}
|
||||
}
|
||||
}
|
||||
cp_Upstreams_v2[k3] = cp_Upstreams_v2_v3
|
||||
}
|
||||
}
|
||||
cp.Upstreams[k2] = cp_Upstreams_v2
|
||||
}
|
||||
}
|
||||
if o.UpstreamsSet != nil {
|
||||
cp.UpstreamsSet = make(map[UpstreamID]struct{}, len(o.UpstreamsSet))
|
||||
cp.UpstreamsSet = make(map[structs.ResourceReference]upstreamIDSet, len(o.UpstreamsSet))
|
||||
for k2, v2 := range o.UpstreamsSet {
|
||||
cp.UpstreamsSet[k2] = v2
|
||||
var cp_UpstreamsSet_v2 upstreamIDSet
|
||||
if v2 != nil {
|
||||
cp_UpstreamsSet_v2 = make(map[UpstreamID]struct{}, len(v2))
|
||||
for k3, v3 := range v2 {
|
||||
cp_UpstreamsSet_v2[k3] = v3
|
||||
}
|
||||
}
|
||||
cp.UpstreamsSet[k2] = cp_UpstreamsSet_v2
|
||||
}
|
||||
}
|
||||
cp.HTTPRoutes = o.HTTPRoutes.DeepCopy()
|
||||
|
|
|
@ -2,7 +2,6 @@ package proxycfg
|
|||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"fmt"
|
||||
"sort"
|
||||
"strings"
|
||||
|
@ -641,6 +640,55 @@ func (c *configSnapshotMeshGateway) isEmptyPeering() bool {
|
|||
!c.PeeringTrustBundlesSet
|
||||
}
|
||||
|
||||
type upstreamIDSet map[UpstreamID]struct{}
|
||||
|
||||
func (u upstreamIDSet) add(uid UpstreamID) {
|
||||
u[uid] = struct{}{}
|
||||
}
|
||||
|
||||
type routeUpstreamSet map[structs.ResourceReference]upstreamIDSet
|
||||
|
||||
func (r routeUpstreamSet) hasUpstream(uid UpstreamID) bool {
|
||||
for _, set := range r {
|
||||
if _, ok := set[uid]; ok {
|
||||
return true
|
||||
}
|
||||
}
|
||||
return false
|
||||
}
|
||||
|
||||
func (r routeUpstreamSet) set(route structs.ResourceReference, set upstreamIDSet) {
|
||||
r[route] = set
|
||||
}
|
||||
|
||||
func (r routeUpstreamSet) delete(route structs.ResourceReference) {
|
||||
delete(r, route)
|
||||
}
|
||||
|
||||
type listenerUpstreamMap map[APIGatewayListenerKey]structs.Upstreams
|
||||
type listenerRouteUpstreams map[structs.ResourceReference]listenerUpstreamMap
|
||||
|
||||
func (l listenerRouteUpstreams) set(route structs.ResourceReference, listener APIGatewayListenerKey, upstreams structs.Upstreams) {
|
||||
if _, ok := l[route]; !ok {
|
||||
l[route] = make(listenerUpstreamMap)
|
||||
}
|
||||
l[route][listener] = upstreams
|
||||
}
|
||||
|
||||
func (l listenerRouteUpstreams) delete(route structs.ResourceReference) {
|
||||
delete(l, route)
|
||||
}
|
||||
|
||||
func (l listenerRouteUpstreams) toUpstreams() map[IngressListenerKey]structs.Upstreams {
|
||||
listeners := make(map[IngressListenerKey]structs.Upstreams, len(l))
|
||||
for _, listenerMap := range l {
|
||||
for listener, set := range listenerMap {
|
||||
listeners[listener] = append(listeners[listener], set...)
|
||||
}
|
||||
}
|
||||
return listeners
|
||||
}
|
||||
|
||||
type configSnapshotAPIGateway struct {
|
||||
ConfigSnapshotUpstreams
|
||||
|
||||
|
@ -669,10 +717,10 @@ type configSnapshotAPIGateway struct {
|
|||
// the GatewayServices RPC to retrieve them.
|
||||
// TODO Determine if this is updated "for free" or not. If not, we might need
|
||||
// to do some work to populate it in handlerAPIGateway
|
||||
Upstreams map[IngressListenerKey]structs.Upstreams
|
||||
Upstreams listenerRouteUpstreams
|
||||
|
||||
// UpstreamsSet is the unique set of UpstreamID the gateway routes to.
|
||||
UpstreamsSet map[UpstreamID]struct{}
|
||||
UpstreamsSet routeUpstreamSet
|
||||
|
||||
HTTPRoutes watch.Map[structs.ResourceReference, *structs.HTTPRouteConfigEntry]
|
||||
TCPRoutes watch.Map[structs.ResourceReference, *structs.TCPRouteConfigEntry]
|
||||
|
@ -733,17 +781,18 @@ func (c *configSnapshotAPIGateway) ToIngress(datacenter string) (configSnapshotI
|
|||
}
|
||||
ingressListener.TLS = tls
|
||||
|
||||
ingressListeners[IngressListenerKey{
|
||||
key := IngressListenerKey{
|
||||
Port: listener.Port,
|
||||
Protocol: string(listener.Protocol),
|
||||
}] = ingressListener
|
||||
}
|
||||
ingressListeners[key] = ingressListener
|
||||
}
|
||||
upstreams := c.DeepCopy().ConfigSnapshotUpstreams
|
||||
upstreams.DiscoveryChain = synthesizedChains
|
||||
snapshotUpstreams := c.DeepCopy().ConfigSnapshotUpstreams
|
||||
snapshotUpstreams.DiscoveryChain = synthesizedChains
|
||||
|
||||
return configSnapshotIngressGateway{
|
||||
Upstreams: c.Upstreams,
|
||||
ConfigSnapshotUpstreams: upstreams,
|
||||
Upstreams: c.Upstreams.toUpstreams(),
|
||||
ConfigSnapshotUpstreams: snapshotUpstreams,
|
||||
GatewayConfigLoaded: true,
|
||||
Listeners: ingressListeners,
|
||||
}, nil
|
||||
|
@ -784,7 +833,7 @@ func (c *configSnapshotAPIGateway) synthesizeChains(datacenter string, protocol
|
|||
}
|
||||
|
||||
if len(chains) == 0 {
|
||||
return nil, nil, errors.New("could not synthesize discovery chain")
|
||||
return nil, nil, nil
|
||||
}
|
||||
|
||||
return synthesizer.Synthesize(chains...)
|
||||
|
|
|
@ -73,6 +73,7 @@ func TestAPIGatewaySnapshotToIngressGatewaySnapshot(t *testing.T) {
|
|||
},
|
||||
Listeners: map[IngressListenerKey]structs.IngressListener{},
|
||||
Defaults: structs.IngressServiceConfig{},
|
||||
Upstreams: map[IngressListenerKey]structs.Upstreams{},
|
||||
},
|
||||
},
|
||||
}
|
||||
|
|
|
@ -64,7 +64,7 @@ func (s *handlerUpstreams) handleUpdateUpstreams(ctx context.Context, u UpdateEv
|
|||
|
||||
switch snap.Kind {
|
||||
case structs.ServiceKindAPIGateway:
|
||||
if _, ok := snap.APIGateway.UpstreamsSet[uid]; !ok {
|
||||
if !snap.APIGateway.UpstreamsSet.hasUpstream(uid) {
|
||||
// Discovery chain is not associated with a known explicit or implicit upstream so it is purged/skipped.
|
||||
// The associated watch was likely cancelled.
|
||||
delete(upstreamsSnapshot.DiscoveryChain, uid)
|
||||
|
|
|
@ -7,15 +7,21 @@ kind = "api-gateway"
|
|||
name = "api-gateway"
|
||||
listeners = [
|
||||
{
|
||||
name = "listener-one"
|
||||
port = 9999
|
||||
protocol = "tcp"
|
||||
},
|
||||
{
|
||||
name = "listener-two"
|
||||
port = 9998
|
||||
protocol = "tcp"
|
||||
}
|
||||
]
|
||||
'
|
||||
|
||||
upsert_config_entry primary '
|
||||
kind = "tcp-route"
|
||||
name = "api-gateway-route"
|
||||
name = "api-gateway-route-one"
|
||||
services = [
|
||||
{
|
||||
name = "s1"
|
||||
|
@ -23,12 +29,46 @@ services = [
|
|||
]
|
||||
parents = [
|
||||
{
|
||||
kind = "api-gateway"
|
||||
name = "api-gateway"
|
||||
sectionName = "listener-one"
|
||||
}
|
||||
]
|
||||
'
|
||||
|
||||
upsert_config_entry primary '
|
||||
kind = "tcp-route"
|
||||
name = "api-gateway-route-two"
|
||||
services = [
|
||||
{
|
||||
name = "s2"
|
||||
}
|
||||
]
|
||||
parents = [
|
||||
{
|
||||
name = "api-gateway"
|
||||
sectionName = "listener-two"
|
||||
}
|
||||
]
|
||||
'
|
||||
|
||||
upsert_config_entry primary '
|
||||
kind = "service-intentions"
|
||||
name = "s1"
|
||||
sources {
|
||||
name = "api-gateway"
|
||||
action = "allow"
|
||||
}
|
||||
'
|
||||
|
||||
upsert_config_entry primary '
|
||||
kind = "service-intentions"
|
||||
name = "s2"
|
||||
sources {
|
||||
name = "api-gateway"
|
||||
action = "deny"
|
||||
}
|
||||
'
|
||||
|
||||
register_services primary
|
||||
|
||||
gen_envoy_bootstrap api-gateway 20000 primary true
|
||||
|
|
|
@ -12,11 +12,21 @@ load helpers
|
|||
}
|
||||
|
||||
@test "api gateway should have healthy endpoints for s1" {
|
||||
assert_config_entry_status Bound True Bound primary tcp-route api-gateway-route-one
|
||||
assert_upstream_has_endpoints_in_status 127.0.0.1:20000 s1 HEALTHY 1
|
||||
}
|
||||
|
||||
@test "api gateway should have healthy endpoints for s2" {
|
||||
assert_config_entry_status Bound True Bound primary tcp-route api-gateway-route-two
|
||||
assert_upstream_has_endpoints_in_status 127.0.0.1:20000 s2 HEALTHY 1
|
||||
}
|
||||
|
||||
@test "api gateway should be able to connect to s1 via configured port" {
|
||||
run retry_default curl -s -f -d hello localhost:9999
|
||||
[ "$status" -eq 0 ]
|
||||
[[ "$output" == *"hello"* ]]
|
||||
}
|
||||
|
||||
@test "api gateway should get an intentions error connecting to s2 via configured port" {
|
||||
run retry_default must_fail_tcp_connection localhost:9998
|
||||
}
|
Loading…
Reference in New Issue