From c31c1158a66e34ab0ddb0e355fa224005c415a26 Mon Sep 17 00:00:00 2001 From: freddygv Date: Thu, 27 Jan 2022 18:56:47 -0700 Subject: [PATCH 1/5] Add failing test The updated test fails because passthrough upstream addresses are not being cleaned up. --- agent/proxycfg/state_test.go | 75 ++++++++++++++++++++++++++++++++++++ 1 file changed, 75 insertions(+) diff --git a/agent/proxycfg/state_test.go b/agent/proxycfg/state_test.go index 2ef27310f9..7383cf7658 100644 --- a/agent/proxycfg/state_test.go +++ b/agent/proxycfg/state_test.go @@ -2041,6 +2041,80 @@ func TestState_WatchesAndUpdates(t *testing.T) { require.Contains(t, snap.ConnectProxy.WatchedUpstreams[dbUID], "mysql.default.default.dc1") }, }, + { + // Receive a new upstream target event without proxy1. + events: []cache.UpdateEvent{ + { + CorrelationID: "upstream-target:db.default.default.dc1:" + dbUID.String(), + Result: &structs.IndexedCheckServiceNodes{ + Nodes: structs.CheckServiceNodes{ + { + Node: &structs.Node{ + Node: "node2", + Address: "10.0.0.2", + }, + Service: &structs.NodeService{ + Kind: structs.ServiceKindConnectProxy, + ID: "db-sidecar-proxy2", + Service: "db-sidecar-proxy", + Proxy: structs.ConnectProxyConfig{ + DestinationServiceName: "db", + TransparentProxy: structs.TransparentProxyConfig{ + DialedDirectly: true, + }, + }, + }, + }, + }, + }, + Err: nil, + }, + }, + verifySnapshot: func(t testing.TB, snap *ConfigSnapshot) { + require.Len(t, snap.ConnectProxy.WatchedUpstreamEndpoints, 1) + require.Contains(t, snap.ConnectProxy.WatchedUpstreamEndpoints, dbUID) + require.Len(t, snap.ConnectProxy.WatchedUpstreamEndpoints[dbUID], 1) + require.Contains(t, snap.ConnectProxy.WatchedUpstreamEndpoints[dbUID], "db.default.default.dc1") + + // THe endpoint and passthrough address for proxy1 should be gone. + require.Equal(t, snap.ConnectProxy.WatchedUpstreamEndpoints[dbUID]["db.default.default.dc1"], + structs.CheckServiceNodes{ + { + Node: &structs.Node{ + Node: "node2", + Address: "10.0.0.2", + }, + Service: &structs.NodeService{ + Kind: structs.ServiceKindConnectProxy, + ID: "db-sidecar-proxy2", + Service: "db-sidecar-proxy", + Proxy: structs.ConnectProxyConfig{ + DestinationServiceName: "db", + TransparentProxy: structs.TransparentProxyConfig{ + DialedDirectly: true, + }, + }, + }, + }, + }, + ) + require.Equal(t, snap.ConnectProxy.PassthroughUpstreams, map[UpstreamID]ServicePassthroughAddrs{ + dbUID: { + SNI: connect.ServiceSNI("db", "", structs.IntentionDefaultNamespace, "", snap.Datacenter, snap.Roots.TrustDomain), + SpiffeID: connect.SpiffeIDService{ + Host: snap.Roots.TrustDomain, + Namespace: db.NamespaceOrDefault(), + Partition: db.PartitionOrDefault(), + Datacenter: snap.Datacenter, + Service: "db", + }, + Addrs: map[string]struct{}{ + "10.0.0.2": {}, + }, + }, + }) + }, + }, // Empty list of upstreams should clean everything up { requiredWatches: map[string]verifyWatchRequest{ @@ -2070,6 +2144,7 @@ func TestState_WatchesAndUpdates(t *testing.T) { require.Empty(t, snap.ConnectProxy.WatchedGatewayEndpoints) require.Empty(t, snap.ConnectProxy.DiscoveryChain) require.Empty(t, snap.ConnectProxy.IntentionUpstreams) + require.Empty(t, snap.ConnectProxy.PassthroughUpstreams) }, }, }, From 659ebc05a99e0457c02b1fe46c4efd508aa688a5 Mon Sep 17 00:00:00 2001 From: freddygv Date: Thu, 27 Jan 2022 20:52:26 -0700 Subject: [PATCH 2/5] Ensure passthrough addresses get cleaned up Transparent proxies can set up filter chains that allow direct connections to upstream service instances. Services that can be dialed directly are stored in the PassthroughUpstreams map of the proxycfg snapshot. Previously these addresses were not being cleaned up based on new service health data. The list of addresses associated with an upstream service would only ever grow. As services scale up and down, eventually they will have instances assigned to an IP that was previously assigned to a different service. When IP addresses are duplicated across filter chain match rules the listener config will be rejected by Envoy. This commit updates the proxycfg snapshot management so that passthrough addresses can get cleaned up when no longer associated with a given upstream. There is still the possibility of a race condition here where due to timing an address is shared between multiple passthrough upstreams. That concern is mitigated by #12195, but will be further addressed in a follow-up. --- agent/proxycfg/connect_proxy.go | 8 ++- agent/proxycfg/manager_test.go | 4 +- agent/proxycfg/naming.go | 19 ++++++ agent/proxycfg/naming_test.go | 37 ++++++++++ agent/proxycfg/snapshot.go | 18 +---- agent/proxycfg/state_test.go | 25 ++----- agent/proxycfg/upstreams.go | 65 +++++------------- agent/xds/clusters.go | 67 ++++++++++++------- agent/xds/clusters_test.go | 20 +----- agent/xds/listeners.go | 45 +++++++------ agent/xds/listeners_test.go | 8 +-- ...ial-instances-directly.envoy-1-20-x.golden | 12 ++-- ...ial-instances-directly.envoy-1-20-x.golden | 4 +- 13 files changed, 169 insertions(+), 163 deletions(-) diff --git a/agent/proxycfg/connect_proxy.go b/agent/proxycfg/connect_proxy.go index 629c416425..12b7028d63 100644 --- a/agent/proxycfg/connect_proxy.go +++ b/agent/proxycfg/connect_proxy.go @@ -27,7 +27,7 @@ func (s *handlerConnectProxy) initialize(ctx context.Context) (ConfigSnapshot, e snap.ConnectProxy.WatchedServiceChecks = make(map[structs.ServiceID][]structs.CheckType) snap.ConnectProxy.PreparedQueryEndpoints = make(map[UpstreamID]structs.CheckServiceNodes) snap.ConnectProxy.UpstreamConfig = make(map[UpstreamID]*structs.Upstream) - snap.ConnectProxy.PassthroughUpstreams = make(map[UpstreamID]ServicePassthroughAddrs) + snap.ConnectProxy.PassthroughUpstreams = make(map[UpstreamID]map[string]map[string]struct{}) // Watch for root changes err := s.cache.Notify(ctx, cachetype.ConnectCARootName, &structs.DCSpecificRequest{ @@ -326,6 +326,12 @@ func (s *handlerConnectProxy) handleUpdate(ctx context.Context, u cache.UpdateEv delete(snap.ConnectProxy.WatchedDiscoveryChains, uid) } } + for uid, _ := range snap.ConnectProxy.PassthroughUpstreams { + if _, ok := seenUpstreams[uid]; !ok { + delete(snap.ConnectProxy.PassthroughUpstreams, uid) + } + } + // These entries are intentionally handled separately from the WatchedDiscoveryChains above. // There have been situations where a discovery watch was cancelled, then fired. // That update event then re-populated the DiscoveryChain map entry, which wouldn't get cleaned up diff --git a/agent/proxycfg/manager_test.go b/agent/proxycfg/manager_test.go index 8ee2a62c35..c514794967 100644 --- a/agent/proxycfg/manager_test.go +++ b/agent/proxycfg/manager_test.go @@ -234,7 +234,7 @@ func TestManager_BasicLifecycle(t *testing.T) { NewUpstreamID(&upstreams[1]): &upstreams[1], NewUpstreamID(&upstreams[2]): &upstreams[2], }, - PassthroughUpstreams: map[UpstreamID]ServicePassthroughAddrs{}, + PassthroughUpstreams: map[UpstreamID]map[string]map[string]struct{}{}, }, PreparedQueryEndpoints: map[UpstreamID]structs.CheckServiceNodes{}, WatchedServiceChecks: map[structs.ServiceID][]structs.CheckType{}, @@ -292,7 +292,7 @@ func TestManager_BasicLifecycle(t *testing.T) { NewUpstreamID(&upstreams[1]): &upstreams[1], NewUpstreamID(&upstreams[2]): &upstreams[2], }, - PassthroughUpstreams: map[UpstreamID]ServicePassthroughAddrs{}, + PassthroughUpstreams: map[UpstreamID]map[string]map[string]struct{}{}, }, PreparedQueryEndpoints: map[UpstreamID]structs.CheckServiceNodes{}, WatchedServiceChecks: map[structs.ServiceID][]structs.CheckType{}, diff --git a/agent/proxycfg/naming.go b/agent/proxycfg/naming.go index 5304b8d1a6..5a5f209758 100644 --- a/agent/proxycfg/naming.go +++ b/agent/proxycfg/naming.go @@ -45,6 +45,25 @@ func NewUpstreamIDFromServiceID(sid structs.ServiceID) UpstreamID { return id } +func NewUpstreamIDFromTargetID(tid string) UpstreamID { + // Drop the leading subset if one is present in the target ID. + separators := strings.Count(tid, ".") + if separators > 3 { + prefix := tid[:strings.Index(tid, ".")+1] + tid = strings.TrimPrefix(tid, prefix) + } + + split := strings.SplitN(tid, ".", 4) + + id := UpstreamID{ + Name: split[0], + EnterpriseMeta: structs.NewEnterpriseMetaWithPartition(split[2], split[1]), + Datacenter: split[3], + } + id.normalize() + return id +} + func (u *UpstreamID) normalize() { if u.Type == structs.UpstreamDestTypeService { u.Type = "" diff --git a/agent/proxycfg/naming_test.go b/agent/proxycfg/naming_test.go index d74ae7e05c..a8ad9a6e71 100644 --- a/agent/proxycfg/naming_test.go +++ b/agent/proxycfg/naming_test.go @@ -8,6 +8,43 @@ import ( "github.com/hashicorp/consul/agent/structs" ) +// TODO(freddy): Needs enterprise test +func TestUpstreamIDFromTargetID(t *testing.T) { + type testcase struct { + tid string + expect UpstreamID + } + run := func(t *testing.T, tc testcase) { + tc.expect.EnterpriseMeta.Normalize() + + got := NewUpstreamIDFromTargetID(tc.tid) + require.Equal(t, tc.expect, got) + } + + cases := map[string]testcase{ + "with subset": { + tid: "v1.foo.default.default.dc2", + expect: UpstreamID{ + Name: "foo", + Datacenter: "dc2", + }, + }, + "without subset": { + tid: "foo.default.default.dc2", + expect: UpstreamID{ + Name: "foo", + Datacenter: "dc2", + }, + }, + } + + for name, tc := range cases { + t.Run(name, func(t *testing.T) { + run(t, tc) + }) + } +} + func TestUpstreamIDFromString(t *testing.T) { type testcase struct { id string diff --git a/agent/proxycfg/snapshot.go b/agent/proxycfg/snapshot.go index 35bea81f57..1b4e0f6bcb 100644 --- a/agent/proxycfg/snapshot.go +++ b/agent/proxycfg/snapshot.go @@ -9,7 +9,6 @@ import ( "github.com/mitchellh/copystructure" "github.com/hashicorp/consul/acl" - "github.com/hashicorp/consul/agent/connect" "github.com/hashicorp/consul/agent/structs" ) @@ -52,8 +51,9 @@ type ConfigSnapshotUpstreams struct { // UpstreamConfig is a map to an upstream's configuration. UpstreamConfig map[UpstreamID]*structs.Upstream - // PassthroughEndpoints is a map of: UpstreamID -> ServicePassthroughAddrs. - PassthroughUpstreams map[UpstreamID]ServicePassthroughAddrs + // PassthroughEndpoints is a map of: UpstreamID -> (map of TargetID -> + // (set of IP addresses)). + PassthroughUpstreams map[UpstreamID]map[string]map[string]struct{} // IntentionUpstreams is a set of upstreams inferred from intentions. // @@ -91,18 +91,6 @@ func gatewayKeyFromString(s string) GatewayKey { return GatewayKey{Partition: split[0], Datacenter: split[1]} } -// ServicePassthroughAddrs contains the LAN addrs -type ServicePassthroughAddrs struct { - // SNI is the Service SNI of the upstream. - SNI string - - // SpiffeID is the SPIFFE ID to use for upstream SAN validation. - SpiffeID connect.SpiffeIDService - - // Addrs is a set of the best LAN addresses for the instances of the upstream. - Addrs map[string]struct{} -} - type configSnapshotConnectProxy struct { ConfigSnapshotUpstreams diff --git a/agent/proxycfg/state_test.go b/agent/proxycfg/state_test.go index 7383cf7658..1e1a6269e0 100644 --- a/agent/proxycfg/state_test.go +++ b/agent/proxycfg/state_test.go @@ -12,7 +12,6 @@ import ( "github.com/hashicorp/consul/agent/cache" cachetype "github.com/hashicorp/consul/agent/cache-types" - "github.com/hashicorp/consul/agent/connect" "github.com/hashicorp/consul/agent/consul/discoverychain" "github.com/hashicorp/consul/agent/rpcclient/health" "github.com/hashicorp/consul/agent/structs" @@ -1985,17 +1984,9 @@ func TestState_WatchesAndUpdates(t *testing.T) { // The LAN service address is used below because transparent proxying // does not support querying service nodes in other DCs, and the WAN address // should not be used in DC-local calls. - require.Equal(t, snap.ConnectProxy.PassthroughUpstreams, map[UpstreamID]ServicePassthroughAddrs{ + require.Equal(t, snap.ConnectProxy.PassthroughUpstreams, map[UpstreamID]map[string]map[string]struct{}{ dbUID: { - SNI: connect.ServiceSNI("db", "", structs.IntentionDefaultNamespace, "", snap.Datacenter, snap.Roots.TrustDomain), - SpiffeID: connect.SpiffeIDService{ - Host: snap.Roots.TrustDomain, - Namespace: db.NamespaceOrDefault(), - Partition: db.PartitionOrDefault(), - Datacenter: snap.Datacenter, - Service: "db", - }, - Addrs: map[string]struct{}{ + "db.default.default.dc1": map[string]struct{}{ "10.10.10.10": {}, "10.0.0.2": {}, }, @@ -2098,17 +2089,9 @@ func TestState_WatchesAndUpdates(t *testing.T) { }, }, ) - require.Equal(t, snap.ConnectProxy.PassthroughUpstreams, map[UpstreamID]ServicePassthroughAddrs{ + require.Equal(t, snap.ConnectProxy.PassthroughUpstreams, map[UpstreamID]map[string]map[string]struct{}{ dbUID: { - SNI: connect.ServiceSNI("db", "", structs.IntentionDefaultNamespace, "", snap.Datacenter, snap.Roots.TrustDomain), - SpiffeID: connect.SpiffeIDService{ - Host: snap.Roots.TrustDomain, - Namespace: db.NamespaceOrDefault(), - Partition: db.PartitionOrDefault(), - Datacenter: snap.Datacenter, - Service: "db", - }, - Addrs: map[string]struct{}{ + "db.default.default.dc1": map[string]struct{}{ "10.0.0.2": {}, }, }, diff --git a/agent/proxycfg/upstreams.go b/agent/proxycfg/upstreams.go index c923546c78..33b8015162 100644 --- a/agent/proxycfg/upstreams.go +++ b/agent/proxycfg/upstreams.go @@ -9,8 +9,6 @@ import ( "github.com/mitchellh/mapstructure" "github.com/hashicorp/consul/agent/cache" - "github.com/hashicorp/consul/agent/connect" - cachetype "github.com/hashicorp/consul/agent/cache-types" "github.com/hashicorp/consul/agent/structs" ) @@ -92,56 +90,25 @@ func (s *handlerUpstreams) handleUpdateUpstreams(ctx context.Context, u cache.Up } upstreamsSnapshot.WatchedUpstreamEndpoints[uid][targetID] = resp.Nodes - var passthroughAddrs map[string]ServicePassthroughAddrs + if s.kind != structs.ServiceKindConnectProxy || s.proxyCfg.Mode != structs.ProxyModeTransparent { + return nil + } + + if _, ok := upstreamsSnapshot.PassthroughUpstreams[uid]; !ok { + upstreamsSnapshot.PassthroughUpstreams[uid] = make(map[string]map[string]struct{}) + } + upstreamsSnapshot.PassthroughUpstreams[uid][targetID] = make(map[string]struct{}) for _, node := range resp.Nodes { - if snap.Proxy.Mode == structs.ProxyModeTransparent && node.Service.Proxy.TransparentProxy.DialedDirectly { - if passthroughAddrs == nil { - passthroughAddrs = make(map[string]ServicePassthroughAddrs) - } - - svc := node.Service.CompoundServiceName() - - // Overwrite the name if it's a connect proxy (as opposed to Connect native). - // We don't reference the proxy name directly for things like SNI, but rather the name - // of the destination. The enterprise meta of a proxy will always be the same as that of - // the destination service, so that remains intact. - if node.Service.Kind == structs.ServiceKindConnectProxy { - dst := node.Service.Proxy.DestinationServiceName - if dst == "" { - dst = node.Service.Proxy.DestinationServiceID - } - svc.Name = dst - } - - sni := connect.ServiceSNI(svc.Name, "", svc.NamespaceOrDefault(), svc.PartitionOrDefault(), snap.Datacenter, snap.Roots.TrustDomain) - - spiffeID := connect.SpiffeIDService{ - Host: snap.Roots.TrustDomain, - Partition: svc.PartitionOrDefault(), - Namespace: svc.NamespaceOrDefault(), - Datacenter: snap.Datacenter, - Service: svc.Name, - } - - svcUID := NewUpstreamIDFromServiceName(svc) - if _, ok := upstreamsSnapshot.PassthroughUpstreams[svcUID]; !ok { - upstreamsSnapshot.PassthroughUpstreams[svcUID] = ServicePassthroughAddrs{ - SNI: sni, - SpiffeID: spiffeID, - - // Stored in a set because it's possible for these to be duplicated - // when the upstream-target is targeted by multiple discovery chains. - Addrs: make(map[string]struct{}), - } - } - - // Make sure to use an external address when crossing partitions. - isRemote := !structs.EqualPartitions(svc.PartitionOrDefault(), s.proxyID.PartitionOrDefault()) - addr, _ := node.BestAddress(isRemote) - - upstreamsSnapshot.PassthroughUpstreams[NewUpstreamIDFromServiceName(svc)].Addrs[addr] = struct{}{} + if !node.Service.Proxy.TransparentProxy.DialedDirectly { + continue } + + // Make sure to use an external address when crossing partitions. + // Datacenter is not considered because transparent proxies cannot dial other datacenters. + isRemote := !structs.EqualPartitions(node.Node.PartitionOrDefault(), s.proxyID.PartitionOrDefault()) + addr, _ := node.BestAddress(isRemote) + upstreamsSnapshot.PassthroughUpstreams[uid][targetID][addr] = struct{}{} } case strings.HasPrefix(u.CorrelationID, "mesh-gateway:"): diff --git a/agent/xds/clusters.go b/agent/xds/clusters.go index 4d33bf3779..ab27bc9b5f 100644 --- a/agent/xds/clusters.go +++ b/agent/xds/clusters.go @@ -171,36 +171,51 @@ func makePassthroughClusters(cfgSnap *proxycfg.ConfigSnapshot) ([]proto.Message, }) } - for _, passthrough := range cfgSnap.ConnectProxy.PassthroughUpstreams { - // Prefixed with passthrough to distinguish from non-passthrough clusters for the same upstream. - name := "passthrough~" + passthrough.SNI + for _, target := range cfgSnap.ConnectProxy.PassthroughUpstreams { + for tid := range target { + uid := proxycfg.NewUpstreamIDFromTargetID(tid) - c := envoy_cluster_v3.Cluster{ - Name: name, - ClusterDiscoveryType: &envoy_cluster_v3.Cluster_Type{ - Type: envoy_cluster_v3.Cluster_ORIGINAL_DST, - }, - LbPolicy: envoy_cluster_v3.Cluster_CLUSTER_PROVIDED, + sni := connect.ServiceSNI( + uid.Name, "", uid.NamespaceOrDefault(), uid.PartitionOrDefault(), cfgSnap.Datacenter, cfgSnap.Roots.TrustDomain) - // TODO(tproxy) This should use the connection timeout configured on the upstream's config entry - ConnectTimeout: ptypes.DurationProto(5 * time.Second), - } + // Prefixed with passthrough to distinguish from non-passthrough clusters for the same upstream. + name := "passthrough~" + sni - commonTLSContext := makeCommonTLSContextFromLeafWithoutParams(cfgSnap, cfgSnap.Leaf()) - err := injectSANMatcher(commonTLSContext, passthrough.SpiffeID) - if err != nil { - return nil, fmt.Errorf("failed to inject SAN matcher rules for cluster %q: %v", passthrough.SNI, err) + c := envoy_cluster_v3.Cluster{ + Name: name, + ClusterDiscoveryType: &envoy_cluster_v3.Cluster_Type{ + Type: envoy_cluster_v3.Cluster_ORIGINAL_DST, + }, + LbPolicy: envoy_cluster_v3.Cluster_CLUSTER_PROVIDED, + + // TODO(tproxy) This should use the connection timeout configured on the upstream's config entry + ConnectTimeout: ptypes.DurationProto(5 * time.Second), + } + + spiffeID := connect.SpiffeIDService{ + Host: cfgSnap.Roots.TrustDomain, + Partition: uid.PartitionOrDefault(), + Namespace: uid.NamespaceOrDefault(), + Datacenter: cfgSnap.Datacenter, + Service: uid.Name, + } + + commonTLSContext := makeCommonTLSContextFromLeafWithoutParams(cfgSnap, cfgSnap.Leaf()) + err := injectSANMatcher(commonTLSContext, spiffeID) + if err != nil { + return nil, fmt.Errorf("failed to inject SAN matcher rules for cluster %q: %v", sni, err) + } + tlsContext := envoy_tls_v3.UpstreamTlsContext{ + CommonTlsContext: commonTLSContext, + Sni: sni, + } + transportSocket, err := makeUpstreamTLSTransportSocket(&tlsContext) + if err != nil { + return nil, err + } + c.TransportSocket = transportSocket + clusters = append(clusters, &c) } - tlsContext := envoy_tls_v3.UpstreamTlsContext{ - CommonTlsContext: commonTLSContext, - Sni: passthrough.SNI, - } - transportSocket, err := makeUpstreamTLSTransportSocket(&tlsContext) - if err != nil { - return nil, err - } - c.TransportSocket = transportSocket - clusters = append(clusters, &c) } return clusters, nil diff --git a/agent/xds/clusters_test.go b/agent/xds/clusters_test.go index 971407efcb..758ca3d032 100644 --- a/agent/xds/clusters_test.go +++ b/agent/xds/clusters_test.go @@ -678,28 +678,14 @@ func TestClustersFromSnapshot(t *testing.T) { } // We add a passthrough cluster for each upstream service name - snap.ConnectProxy.PassthroughUpstreams = map[proxycfg.UpstreamID]proxycfg.ServicePassthroughAddrs{ + snap.ConnectProxy.PassthroughUpstreams = map[proxycfg.UpstreamID]map[string]map[string]struct{}{ kafkaUID: { - SNI: "kafka.default.dc1.internal.e5b08d03-bfc3-c870-1833-baddb116e648.consul", - SpiffeID: connect.SpiffeIDService{ - Host: "e5b08d03-bfc3-c870-1833-baddb116e648.consul", - Namespace: "default", - Datacenter: "dc1", - Service: "kafka", - }, - Addrs: map[string]struct{}{ + "kafka.default.default.dc1": map[string]struct{}{ "9.9.9.9": {}, }, }, mongoUID: { - SNI: "mongo.default.dc1.internal.e5b08d03-bfc3-c870-1833-baddb116e648.consul", - SpiffeID: connect.SpiffeIDService{ - Host: "e5b08d03-bfc3-c870-1833-baddb116e648.consul", - Namespace: "default", - Datacenter: "dc1", - Service: "mongo", - }, - Addrs: map[string]struct{}{ + "mongo.default.default.dc1": map[string]struct{}{ "10.10.10.10": {}, "10.10.10.12": {}, }, diff --git a/agent/xds/listeners.go b/agent/xds/listeners.go index fcc3fd2ad9..e7e87724b7 100644 --- a/agent/xds/listeners.go +++ b/agent/xds/listeners.go @@ -218,26 +218,33 @@ func (s *ResourceGenerator) listenersFromSnapshotConnectProxy(cfgSnap *proxycfg. // as opposed to via a virtual IP. var passthroughChains []*envoy_listener_v3.FilterChain - for uid, passthrough := range cfgSnap.ConnectProxy.PassthroughUpstreams { - u := structs.Upstream{ - DestinationName: uid.Name, - DestinationNamespace: uid.NamespaceOrDefault(), - DestinationPartition: uid.PartitionOrDefault(), + for _, target := range cfgSnap.ConnectProxy.PassthroughUpstreams { + for tid, addrs := range target { + uid := proxycfg.NewUpstreamIDFromTargetID(tid) + + sni := connect.ServiceSNI( + uid.Name, "", uid.NamespaceOrDefault(), uid.PartitionOrDefault(), cfgSnap.Datacenter, cfgSnap.Roots.TrustDomain) + + u := structs.Upstream{ + DestinationName: uid.Name, + DestinationNamespace: uid.NamespaceOrDefault(), + DestinationPartition: uid.PartitionOrDefault(), + } + + filterName := fmt.Sprintf("%s.%s.%s.%s", u.DestinationName, u.DestinationNamespace, u.DestinationPartition, cfgSnap.Datacenter) + + filterChain, err := s.makeUpstreamFilterChain(filterChainOpts{ + clusterName: "passthrough~" + sni, + filterName: filterName, + protocol: "tcp", + }) + if err != nil { + return nil, err + } + filterChain.FilterChainMatch = makeFilterChainMatchFromAddrs(addrs) + + passthroughChains = append(passthroughChains, filterChain) } - - filterName := fmt.Sprintf("%s.%s.%s.%s", u.DestinationName, u.DestinationNamespace, u.DestinationPartition, cfgSnap.Datacenter) - - filterChain, err := s.makeUpstreamFilterChain(filterChainOpts{ - clusterName: "passthrough~" + passthrough.SNI, - filterName: filterName, - protocol: "tcp", - }) - if err != nil { - return nil, err - } - filterChain.FilterChainMatch = makeFilterChainMatchFromAddrs(passthrough.Addrs) - - passthroughChains = append(passthroughChains, filterChain) } outboundListener.FilterChains = append(outboundListener.FilterChains, passthroughChains...) diff --git a/agent/xds/listeners_test.go b/agent/xds/listeners_test.go index c097176f4b..f40d68d841 100644 --- a/agent/xds/listeners_test.go +++ b/agent/xds/listeners_test.go @@ -1211,16 +1211,14 @@ func TestListenersFromSnapshot(t *testing.T) { // We add a filter chains for each passthrough service name. // The filter chain will route to a cluster with the same SNI name. - snap.ConnectProxy.PassthroughUpstreams = map[proxycfg.UpstreamID]proxycfg.ServicePassthroughAddrs{ + snap.ConnectProxy.PassthroughUpstreams = map[proxycfg.UpstreamID]map[string]map[string]struct{}{ kafkaUID: { - SNI: "kafka.default.dc1.internal.e5b08d03-bfc3-c870-1833-baddb116e648.consul", - Addrs: map[string]struct{}{ + "kafka.default.default.dc1": map[string]struct{}{ "9.9.9.9": {}, }, }, mongoUID: { - SNI: "mongo.default.dc1.internal.e5b08d03-bfc3-c870-1833-baddb116e648.consul", - Addrs: map[string]struct{}{ + "mongo.default.default.dc1": map[string]struct{}{ "10.10.10.10": {}, "10.10.10.12": {}, }, diff --git a/agent/xds/testdata/clusters/transparent-proxy-dial-instances-directly.envoy-1-20-x.golden b/agent/xds/testdata/clusters/transparent-proxy-dial-instances-directly.envoy-1-20-x.golden index 122b94d35b..28151072aa 100644 --- a/agent/xds/testdata/clusters/transparent-proxy-dial-instances-directly.envoy-1-20-x.golden +++ b/agent/xds/testdata/clusters/transparent-proxy-dial-instances-directly.envoy-1-20-x.golden @@ -206,7 +206,7 @@ }, { "@type": "type.googleapis.com/envoy.config.cluster.v3.Cluster", - "name": "passthrough~kafka.default.dc1.internal.e5b08d03-bfc3-c870-1833-baddb116e648.consul", + "name": "passthrough~kafka.default.dc1.internal.11111111-2222-3333-4444-555555555555.consul", "type": "ORIGINAL_DST", "connectTimeout": "5s", "lbPolicy": "CLUSTER_PROVIDED", @@ -234,18 +234,18 @@ }, "matchSubjectAltNames": [ { - "exact": "spiffe://e5b08d03-bfc3-c870-1833-baddb116e648.consul/ns/default/dc/dc1/svc/kafka" + "exact": "spiffe://11111111-2222-3333-4444-555555555555.consul/ns/default/dc/dc1/svc/kafka" } ] } }, - "sni": "kafka.default.dc1.internal.e5b08d03-bfc3-c870-1833-baddb116e648.consul" + "sni": "kafka.default.dc1.internal.11111111-2222-3333-4444-555555555555.consul" } } }, { "@type": "type.googleapis.com/envoy.config.cluster.v3.Cluster", - "name": "passthrough~mongo.default.dc1.internal.e5b08d03-bfc3-c870-1833-baddb116e648.consul", + "name": "passthrough~mongo.default.dc1.internal.11111111-2222-3333-4444-555555555555.consul", "type": "ORIGINAL_DST", "connectTimeout": "5s", "lbPolicy": "CLUSTER_PROVIDED", @@ -273,12 +273,12 @@ }, "matchSubjectAltNames": [ { - "exact": "spiffe://e5b08d03-bfc3-c870-1833-baddb116e648.consul/ns/default/dc/dc1/svc/mongo" + "exact": "spiffe://11111111-2222-3333-4444-555555555555.consul/ns/default/dc/dc1/svc/mongo" } ] } }, - "sni": "mongo.default.dc1.internal.e5b08d03-bfc3-c870-1833-baddb116e648.consul" + "sni": "mongo.default.dc1.internal.11111111-2222-3333-4444-555555555555.consul" } } } diff --git a/agent/xds/testdata/listeners/transparent-proxy-dial-instances-directly.envoy-1-20-x.golden b/agent/xds/testdata/listeners/transparent-proxy-dial-instances-directly.envoy-1-20-x.golden index 7a422ca402..e5afb75387 100644 --- a/agent/xds/testdata/listeners/transparent-proxy-dial-instances-directly.envoy-1-20-x.golden +++ b/agent/xds/testdata/listeners/transparent-proxy-dial-instances-directly.envoy-1-20-x.golden @@ -55,7 +55,7 @@ "typedConfig": { "@type": "type.googleapis.com/envoy.extensions.filters.network.tcp_proxy.v3.TcpProxy", "statPrefix": "upstream.mongo.default.default.dc1", - "cluster": "passthrough~mongo.default.dc1.internal.e5b08d03-bfc3-c870-1833-baddb116e648.consul" + "cluster": "passthrough~mongo.default.dc1.internal.11111111-2222-3333-4444-555555555555.consul" } } ] @@ -95,7 +95,7 @@ "typedConfig": { "@type": "type.googleapis.com/envoy.extensions.filters.network.tcp_proxy.v3.TcpProxy", "statPrefix": "upstream.kafka.default.default.dc1", - "cluster": "passthrough~kafka.default.dc1.internal.e5b08d03-bfc3-c870-1833-baddb116e648.consul" + "cluster": "passthrough~kafka.default.dc1.internal.11111111-2222-3333-4444-555555555555.consul" } } ] From cbea3d203c44db86a37920c43015e10e66ecfd97 Mon Sep 17 00:00:00 2001 From: freddygv Date: Thu, 27 Jan 2022 23:49:06 -0700 Subject: [PATCH 3/5] Fix race of upstreams with same passthrough ip Due to timing, a transparent proxy could have two upstreams to dial directly with the same address. For example: - The orders service can dial upstreams shipping and payment directly. - An instance of shipping at address 10.0.0.1 is deregistered. - Payments is scaled up and scheduled to have address 10.0.0.1. - The orders service receives the event for the new payments instance before seeing the deregistration for the shipping instance. At this point two upstreams have the same passthrough address and Envoy will reject the listener configuration. To disambiguate this commit considers the Raft index when storing passthrough addresses. In the example above, 10.0.0.1 would only be associated with the newer payments service instance. --- agent/consul/gateway_locator.go | 2 +- agent/proxycfg/connect_proxy.go | 8 +- agent/proxycfg/manager_test.go | 2 + agent/proxycfg/snapshot.go | 17 +++- agent/proxycfg/state.go | 2 +- agent/proxycfg/state_test.go | 143 +++++++++++++++++++++++++++++++- agent/proxycfg/upstreams.go | 40 +++++++-- agent/structs/structs.go | 6 +- agent/structs/structs_test.go | 58 ++++++++++++- agent/xds/clusters.go | 2 +- agent/xds/endpoints.go | 4 +- agent/xds/listeners.go | 12 +-- 12 files changed, 270 insertions(+), 26 deletions(-) diff --git a/agent/consul/gateway_locator.go b/agent/consul/gateway_locator.go index cac63692f7..ce6e390337 100644 --- a/agent/consul/gateway_locator.go +++ b/agent/consul/gateway_locator.go @@ -455,7 +455,7 @@ func retainGateways(full structs.CheckServiceNodes) structs.CheckServiceNodes { func renderGatewayAddrs(gateways structs.CheckServiceNodes, wan bool) []string { out := make([]string, 0, len(gateways)) for _, csn := range gateways { - addr, port := csn.BestAddress(wan) + _, addr, port := csn.BestAddress(wan) completeAddr := ipaddr.FormatAddressPort(addr, port) out = append(out, completeAddr) } diff --git a/agent/proxycfg/connect_proxy.go b/agent/proxycfg/connect_proxy.go index 12b7028d63..64ce9020c5 100644 --- a/agent/proxycfg/connect_proxy.go +++ b/agent/proxycfg/connect_proxy.go @@ -28,6 +28,7 @@ func (s *handlerConnectProxy) initialize(ctx context.Context) (ConfigSnapshot, e snap.ConnectProxy.PreparedQueryEndpoints = make(map[UpstreamID]structs.CheckServiceNodes) snap.ConnectProxy.UpstreamConfig = make(map[UpstreamID]*structs.Upstream) snap.ConnectProxy.PassthroughUpstreams = make(map[UpstreamID]map[string]map[string]struct{}) + snap.ConnectProxy.PassthroughIndices = make(map[string]indexedTarget) // Watch for root changes err := s.cache.Notify(ctx, cachetype.ConnectCARootName, &structs.DCSpecificRequest{ @@ -326,11 +327,16 @@ func (s *handlerConnectProxy) handleUpdate(ctx context.Context, u cache.UpdateEv delete(snap.ConnectProxy.WatchedDiscoveryChains, uid) } } - for uid, _ := range snap.ConnectProxy.PassthroughUpstreams { + for uid := range snap.ConnectProxy.PassthroughUpstreams { if _, ok := seenUpstreams[uid]; !ok { delete(snap.ConnectProxy.PassthroughUpstreams, uid) } } + for addr, indexed := range snap.ConnectProxy.PassthroughIndices { + if _, ok := seenUpstreams[indexed.upstreamID]; !ok { + delete(snap.ConnectProxy.PassthroughIndices, addr) + } + } // These entries are intentionally handled separately from the WatchedDiscoveryChains above. // There have been situations where a discovery watch was cancelled, then fired. diff --git a/agent/proxycfg/manager_test.go b/agent/proxycfg/manager_test.go index c514794967..5ac37b793c 100644 --- a/agent/proxycfg/manager_test.go +++ b/agent/proxycfg/manager_test.go @@ -235,6 +235,7 @@ func TestManager_BasicLifecycle(t *testing.T) { NewUpstreamID(&upstreams[2]): &upstreams[2], }, PassthroughUpstreams: map[UpstreamID]map[string]map[string]struct{}{}, + PassthroughIndices: map[string]indexedTarget{}, }, PreparedQueryEndpoints: map[UpstreamID]structs.CheckServiceNodes{}, WatchedServiceChecks: map[structs.ServiceID][]structs.CheckType{}, @@ -293,6 +294,7 @@ func TestManager_BasicLifecycle(t *testing.T) { NewUpstreamID(&upstreams[2]): &upstreams[2], }, PassthroughUpstreams: map[UpstreamID]map[string]map[string]struct{}{}, + PassthroughIndices: map[string]indexedTarget{}, }, PreparedQueryEndpoints: map[UpstreamID]structs.CheckServiceNodes{}, WatchedServiceChecks: map[structs.ServiceID][]structs.CheckType{}, diff --git a/agent/proxycfg/snapshot.go b/agent/proxycfg/snapshot.go index 1b4e0f6bcb..98aafa2629 100644 --- a/agent/proxycfg/snapshot.go +++ b/agent/proxycfg/snapshot.go @@ -52,15 +52,30 @@ type ConfigSnapshotUpstreams struct { UpstreamConfig map[UpstreamID]*structs.Upstream // PassthroughEndpoints is a map of: UpstreamID -> (map of TargetID -> - // (set of IP addresses)). + // (set of IP addresses)). It contains the upstream endpoints that + // can be dialed directly by a transparent proxy. PassthroughUpstreams map[UpstreamID]map[string]map[string]struct{} + // PassthroughIndices is a map of: address -> indexedTarget. + // It is used to track the modify index associated with a passthrough address. + // Tracking this index helps break ties when a single address is shared by + // more than one upstream due to a race. + PassthroughIndices map[string]indexedTarget + // IntentionUpstreams is a set of upstreams inferred from intentions. // // This list only applies to proxies registered in 'transparent' mode. IntentionUpstreams map[UpstreamID]struct{} } +// indexedTarget is used to associate the Raft modify index of a resource +// with the corresponding upstream target. +type indexedTarget struct { + upstreamID UpstreamID + targetID string + idx uint64 +} + type GatewayKey struct { Datacenter string Partition string diff --git a/agent/proxycfg/state.go b/agent/proxycfg/state.go index f31c62b4e1..0c55f034a1 100644 --- a/agent/proxycfg/state.go +++ b/agent/proxycfg/state.go @@ -412,7 +412,7 @@ func hostnameEndpoints(logger hclog.Logger, localKey GatewayKey, nodes structs.C ) for _, n := range nodes { - addr, _ := n.BestAddress(!localKey.Matches(n.Node.Datacenter, n.Node.PartitionOrDefault())) + _, addr, _ := n.BestAddress(!localKey.Matches(n.Node.Datacenter, n.Node.PartitionOrDefault())) if net.ParseIP(addr) != nil { hasIP = true continue diff --git a/agent/proxycfg/state_test.go b/agent/proxycfg/state_test.go index 1e1a6269e0..c80e57f5dc 100644 --- a/agent/proxycfg/state_test.go +++ b/agent/proxycfg/state_test.go @@ -1909,12 +1909,18 @@ func TestState_WatchesAndUpdates(t *testing.T) { DialedDirectly: true, }, }, + RaftIndex: structs.RaftIndex{ + ModifyIndex: 12, + }, }, }, { Node: &structs.Node{ Node: "node2", Address: "10.0.0.2", + RaftIndex: structs.RaftIndex{ + ModifyIndex: 21, + }, }, Service: &structs.NodeService{ Kind: structs.ServiceKindConnectProxy, @@ -1960,12 +1966,18 @@ func TestState_WatchesAndUpdates(t *testing.T) { DialedDirectly: true, }, }, + RaftIndex: structs.RaftIndex{ + ModifyIndex: 12, + }, }, }, { Node: &structs.Node{ Node: "node2", Address: "10.0.0.2", + RaftIndex: structs.RaftIndex{ + ModifyIndex: 21, + }, }, Service: &structs.NodeService{ Kind: structs.ServiceKindConnectProxy, @@ -1992,6 +2004,18 @@ func TestState_WatchesAndUpdates(t *testing.T) { }, }, }) + require.Equal(t, snap.ConnectProxy.PassthroughIndices, map[string]indexedTarget{ + "10.0.0.2": { + upstreamID: dbUID, + targetID: "db.default.default.dc1", + idx: 21, + }, + "10.10.10.10": { + upstreamID: dbUID, + targetID: "db.default.default.dc1", + idx: 12, + }, + }) }, }, // Discovery chain updates should be stored @@ -2043,6 +2067,9 @@ func TestState_WatchesAndUpdates(t *testing.T) { Node: &structs.Node{ Node: "node2", Address: "10.0.0.2", + RaftIndex: structs.RaftIndex{ + ModifyIndex: 21, + }, }, Service: &structs.NodeService{ Kind: structs.ServiceKindConnectProxy, @@ -2074,6 +2101,9 @@ func TestState_WatchesAndUpdates(t *testing.T) { Node: &structs.Node{ Node: "node2", Address: "10.0.0.2", + RaftIndex: structs.RaftIndex{ + ModifyIndex: 21, + }, }, Service: &structs.NodeService{ Kind: structs.ServiceKindConnectProxy, @@ -2096,10 +2126,120 @@ func TestState_WatchesAndUpdates(t *testing.T) { }, }, }) + require.Equal(t, snap.ConnectProxy.PassthroughIndices, map[string]indexedTarget{ + "10.0.0.2": { + upstreamID: dbUID, + targetID: "db.default.default.dc1", + idx: 21, + }, + }) }, }, - // Empty list of upstreams should clean everything up { + // Receive a new upstream target event with a conflicting passthrough address + events: []cache.UpdateEvent{ + { + CorrelationID: "upstream-target:api.default.default.dc1:" + apiUID.String(), + Result: &structs.IndexedCheckServiceNodes{ + Nodes: structs.CheckServiceNodes{ + { + Node: &structs.Node{ + Node: "node2", + }, + Service: &structs.NodeService{ + Kind: structs.ServiceKindConnectProxy, + ID: "api-sidecar-proxy", + Service: "api-sidecar-proxy", + Address: "10.0.0.2", + Proxy: structs.ConnectProxyConfig{ + DestinationServiceName: "api", + TransparentProxy: structs.TransparentProxyConfig{ + DialedDirectly: true, + }, + }, + RaftIndex: structs.RaftIndex{ + ModifyIndex: 32, + }, + }, + }, + }, + }, + Err: nil, + }, + }, + verifySnapshot: func(t testing.TB, snap *ConfigSnapshot) { + require.Len(t, snap.ConnectProxy.WatchedUpstreamEndpoints, 2) + require.Contains(t, snap.ConnectProxy.WatchedUpstreamEndpoints, apiUID) + require.Len(t, snap.ConnectProxy.WatchedUpstreamEndpoints[apiUID], 1) + require.Contains(t, snap.ConnectProxy.WatchedUpstreamEndpoints[apiUID], "api.default.default.dc1") + + // THe endpoint and passthrough address for proxy1 should be gone. + require.Equal(t, snap.ConnectProxy.WatchedUpstreamEndpoints[apiUID]["api.default.default.dc1"], + structs.CheckServiceNodes{ + { + Node: &structs.Node{ + Node: "node2", + }, + Service: &structs.NodeService{ + Kind: structs.ServiceKindConnectProxy, + ID: "api-sidecar-proxy", + Service: "api-sidecar-proxy", + Address: "10.0.0.2", + Proxy: structs.ConnectProxyConfig{ + DestinationServiceName: "api", + TransparentProxy: structs.TransparentProxyConfig{ + DialedDirectly: true, + }, + }, + RaftIndex: structs.RaftIndex{ + ModifyIndex: 32, + }, + }, + }, + }, + ) + require.Equal(t, snap.ConnectProxy.PassthroughUpstreams, map[UpstreamID]map[string]map[string]struct{}{ + apiUID: { + // This target has a higher index so the old passthrough address should be discarded. + "api.default.default.dc1": map[string]struct{}{ + "10.0.0.2": {}, + }, + }, + }) + require.Equal(t, snap.ConnectProxy.PassthroughIndices, map[string]indexedTarget{ + "10.0.0.2": { + upstreamID: apiUID, + targetID: "api.default.default.dc1", + idx: 32, + }, + }) + }, + }, + { + // Event with no nodes should clean up addrs + events: []cache.UpdateEvent{ + { + CorrelationID: "upstream-target:api.default.default.dc1:" + apiUID.String(), + Result: &structs.IndexedCheckServiceNodes{ + Nodes: structs.CheckServiceNodes{}, + }, + Err: nil, + }, + }, + verifySnapshot: func(t testing.TB, snap *ConfigSnapshot) { + require.Len(t, snap.ConnectProxy.WatchedUpstreamEndpoints, 2) + require.Contains(t, snap.ConnectProxy.WatchedUpstreamEndpoints, apiUID) + require.Len(t, snap.ConnectProxy.WatchedUpstreamEndpoints[apiUID], 1) + require.Contains(t, snap.ConnectProxy.WatchedUpstreamEndpoints[apiUID], "api.default.default.dc1") + + // The endpoint and passthrough address for proxy1 should be gone. + require.Empty(t, snap.ConnectProxy.WatchedUpstreamEndpoints[apiUID]["api.default.default.dc1"]) + require.Empty(t, snap.ConnectProxy.PassthroughUpstreams[apiUID]["api.default.default.dc1"]) + require.Empty(t, snap.ConnectProxy.PassthroughIndices) + }, + }, + { + // Empty list of upstreams should clean up map keys requiredWatches: map[string]verifyWatchRequest{ rootsWatchID: genVerifyRootsWatch("dc1"), intentionUpstreamsID: genVerifyServiceSpecificRequest(intentionUpstreamsID, @@ -2128,6 +2268,7 @@ func TestState_WatchesAndUpdates(t *testing.T) { require.Empty(t, snap.ConnectProxy.DiscoveryChain) require.Empty(t, snap.ConnectProxy.IntentionUpstreams) require.Empty(t, snap.ConnectProxy.PassthroughUpstreams) + require.Empty(t, snap.ConnectProxy.PassthroughIndices) }, }, }, diff --git a/agent/proxycfg/upstreams.go b/agent/proxycfg/upstreams.go index 33b8015162..4cd406a42d 100644 --- a/agent/proxycfg/upstreams.go +++ b/agent/proxycfg/upstreams.go @@ -94,10 +94,17 @@ func (s *handlerUpstreams) handleUpdateUpstreams(ctx context.Context, u cache.Up return nil } - if _, ok := upstreamsSnapshot.PassthroughUpstreams[uid]; !ok { - upstreamsSnapshot.PassthroughUpstreams[uid] = make(map[string]map[string]struct{}) + // Clear out this target's existing passthrough upstreams and indices so that they can be repopulated below. + if _, ok := upstreamsSnapshot.PassthroughUpstreams[uid]; ok { + for addr := range upstreamsSnapshot.PassthroughUpstreams[uid][targetID] { + if indexed := upstreamsSnapshot.PassthroughIndices[addr]; indexed.targetID == targetID && indexed.upstreamID == uid { + delete(upstreamsSnapshot.PassthroughIndices, addr) + } + } + upstreamsSnapshot.PassthroughUpstreams[uid][targetID] = make(map[string]struct{}) } - upstreamsSnapshot.PassthroughUpstreams[uid][targetID] = make(map[string]struct{}) + + passthroughs := make(map[string]struct{}) for _, node := range resp.Nodes { if !node.Service.Proxy.TransparentProxy.DialedDirectly { @@ -107,8 +114,31 @@ func (s *handlerUpstreams) handleUpdateUpstreams(ctx context.Context, u cache.Up // Make sure to use an external address when crossing partitions. // Datacenter is not considered because transparent proxies cannot dial other datacenters. isRemote := !structs.EqualPartitions(node.Node.PartitionOrDefault(), s.proxyID.PartitionOrDefault()) - addr, _ := node.BestAddress(isRemote) - upstreamsSnapshot.PassthroughUpstreams[uid][targetID][addr] = struct{}{} + csnIdx, addr, _ := node.BestAddress(isRemote) + + existing := upstreamsSnapshot.PassthroughIndices[addr] + if existing.idx > csnIdx { + // The last known instance with this address had a higher index so it takes precedence. + continue + } + + // The current instance has a higher Raft index so we ensure the passthrough address is only + // associated with this upstream target. Older associations are cleaned up as needed. + delete(upstreamsSnapshot.PassthroughUpstreams[existing.upstreamID][existing.targetID], addr) + if len(upstreamsSnapshot.PassthroughUpstreams[existing.upstreamID][existing.targetID]) == 0 { + delete(upstreamsSnapshot.PassthroughUpstreams[existing.upstreamID], existing.targetID) + } + if len(upstreamsSnapshot.PassthroughUpstreams[existing.upstreamID]) == 0 { + delete(upstreamsSnapshot.PassthroughUpstreams, existing.upstreamID) + } + + upstreamsSnapshot.PassthroughIndices[addr] = indexedTarget{idx: csnIdx, upstreamID: uid, targetID: targetID} + passthroughs[addr] = struct{}{} + } + if len(passthroughs) > 0 { + upstreamsSnapshot.PassthroughUpstreams[uid] = map[string]map[string]struct{}{ + targetID: passthroughs, + } } case strings.HasPrefix(u.CorrelationID, "mesh-gateway:"): diff --git a/agent/structs/structs.go b/agent/structs/structs.go index 168c70efe1..e2e94b4387 100644 --- a/agent/structs/structs.go +++ b/agent/structs/structs.go @@ -1741,7 +1741,7 @@ type CheckServiceNode struct { Checks HealthChecks } -func (csn *CheckServiceNode) BestAddress(wan bool) (string, int) { +func (csn *CheckServiceNode) BestAddress(wan bool) (uint64, string, int) { // TODO (mesh-gateway) needs a test // best address // wan @@ -1754,12 +1754,14 @@ func (csn *CheckServiceNode) BestAddress(wan bool) (string, int) { // node addr addr, port := csn.Service.BestAddress(wan) + idx := csn.Service.ModifyIndex if addr == "" { addr = csn.Node.BestAddress(wan) + idx = csn.Node.ModifyIndex } - return addr, port + return idx, addr, port } func (csn *CheckServiceNode) CanRead(authz acl.Authorizer) acl.EnforcementDecision { diff --git a/agent/structs/structs_test.go b/agent/structs/structs_test.go index b4ea8e50c5..f87d568d43 100644 --- a/agent/structs/structs_test.go +++ b/agent/structs/structs_test.go @@ -2105,14 +2105,18 @@ func TestCheckServiceNode_BestAddress(t *testing.T) { input CheckServiceNode lanAddr string lanPort int + lanIdx uint64 wanAddr string wanPort int + wanIdx uint64 } nodeAddr := "10.1.2.3" nodeWANAddr := "198.18.19.20" + nodeIdx := uint64(11) serviceAddr := "10.2.3.4" servicePort := 1234 + serviceIdx := uint64(22) serviceWANAddr := "198.19.20.21" serviceWANPort := 987 @@ -2121,15 +2125,23 @@ func TestCheckServiceNode_BestAddress(t *testing.T) { input: CheckServiceNode{ Node: &Node{ Address: nodeAddr, + RaftIndex: RaftIndex{ + ModifyIndex: nodeIdx, + }, }, Service: &NodeService{ Port: servicePort, + RaftIndex: RaftIndex{ + ModifyIndex: serviceIdx, + }, }, }, lanAddr: nodeAddr, + lanIdx: nodeIdx, lanPort: servicePort, wanAddr: nodeAddr, + wanIdx: nodeIdx, wanPort: servicePort, }, "node-wan-address": { @@ -2139,15 +2151,23 @@ func TestCheckServiceNode_BestAddress(t *testing.T) { TaggedAddresses: map[string]string{ "wan": nodeWANAddr, }, + RaftIndex: RaftIndex{ + ModifyIndex: nodeIdx, + }, }, Service: &NodeService{ Port: servicePort, + RaftIndex: RaftIndex{ + ModifyIndex: serviceIdx, + }, }, }, lanAddr: nodeAddr, + lanIdx: nodeIdx, lanPort: servicePort, wanAddr: nodeWANAddr, + wanIdx: nodeIdx, wanPort: servicePort, }, "service-address": { @@ -2158,16 +2178,24 @@ func TestCheckServiceNode_BestAddress(t *testing.T) { TaggedAddresses: map[string]string{ "wan": nodeWANAddr, }, + RaftIndex: RaftIndex{ + ModifyIndex: nodeIdx, + }, }, Service: &NodeService{ Address: serviceAddr, Port: servicePort, + RaftIndex: RaftIndex{ + ModifyIndex: serviceIdx, + }, }, }, lanAddr: serviceAddr, + lanIdx: serviceIdx, lanPort: servicePort, wanAddr: serviceAddr, + wanIdx: serviceIdx, wanPort: servicePort, }, "service-wan-address": { @@ -2178,6 +2206,9 @@ func TestCheckServiceNode_BestAddress(t *testing.T) { TaggedAddresses: map[string]string{ "wan": nodeWANAddr, }, + RaftIndex: RaftIndex{ + ModifyIndex: nodeIdx, + }, }, Service: &NodeService{ Address: serviceAddr, @@ -2188,12 +2219,17 @@ func TestCheckServiceNode_BestAddress(t *testing.T) { Port: serviceWANPort, }, }, + RaftIndex: RaftIndex{ + ModifyIndex: serviceIdx, + }, }, }, lanAddr: serviceAddr, + lanIdx: serviceIdx, lanPort: servicePort, wanAddr: serviceWANAddr, + wanIdx: serviceIdx, wanPort: serviceWANPort, }, "service-wan-address-default-port": { @@ -2204,6 +2240,9 @@ func TestCheckServiceNode_BestAddress(t *testing.T) { TaggedAddresses: map[string]string{ "wan": nodeWANAddr, }, + RaftIndex: RaftIndex{ + ModifyIndex: nodeIdx, + }, }, Service: &NodeService{ Address: serviceAddr, @@ -2214,12 +2253,17 @@ func TestCheckServiceNode_BestAddress(t *testing.T) { Port: 0, }, }, + RaftIndex: RaftIndex{ + ModifyIndex: serviceIdx, + }, }, }, lanAddr: serviceAddr, + lanIdx: serviceIdx, lanPort: servicePort, wanAddr: serviceWANAddr, + wanIdx: serviceIdx, wanPort: servicePort, }, "service-wan-address-node-lan": { @@ -2230,6 +2274,9 @@ func TestCheckServiceNode_BestAddress(t *testing.T) { TaggedAddresses: map[string]string{ "wan": nodeWANAddr, }, + RaftIndex: RaftIndex{ + ModifyIndex: nodeIdx, + }, }, Service: &NodeService{ Port: servicePort, @@ -2239,12 +2286,17 @@ func TestCheckServiceNode_BestAddress(t *testing.T) { Port: serviceWANPort, }, }, + RaftIndex: RaftIndex{ + ModifyIndex: serviceIdx, + }, }, }, lanAddr: nodeAddr, + lanIdx: nodeIdx, lanPort: servicePort, wanAddr: serviceWANAddr, + wanIdx: serviceIdx, wanPort: serviceWANPort, }, } @@ -2254,13 +2306,15 @@ func TestCheckServiceNode_BestAddress(t *testing.T) { tc := tc t.Run(name, func(t *testing.T) { - addr, port := tc.input.BestAddress(false) + idx, addr, port := tc.input.BestAddress(false) require.Equal(t, tc.lanAddr, addr) require.Equal(t, tc.lanPort, port) + require.Equal(t, tc.lanIdx, idx) - addr, port = tc.input.BestAddress(true) + idx, addr, port = tc.input.BestAddress(true) require.Equal(t, tc.wanAddr, addr) require.Equal(t, tc.wanPort, port) + require.Equal(t, tc.wanIdx, idx) }) } } diff --git a/agent/xds/clusters.go b/agent/xds/clusters.go index ab27bc9b5f..283df4125b 100644 --- a/agent/xds/clusters.go +++ b/agent/xds/clusters.go @@ -907,7 +907,7 @@ func (s *ResourceGenerator) makeGatewayCluster(snap *proxycfg.ConfigSnapshot, op fallback *envoy_endpoint_v3.LbEndpoint ) for i, e := range opts.hostnameEndpoints { - addr, port := e.BestAddress(opts.isRemote) + _, addr, port := e.BestAddress(opts.isRemote) uniqueHostnames[addr] = true health, weight := calculateEndpointHealthAndWeight(e, opts.onlyPassing) diff --git a/agent/xds/endpoints.go b/agent/xds/endpoints.go index 4742ea5264..9981dc9401 100644 --- a/agent/xds/endpoints.go +++ b/agent/xds/endpoints.go @@ -221,7 +221,7 @@ func (s *ResourceGenerator) endpointsFromSnapshotMeshGateway(cfgSnap *proxycfg.C for _, srv := range cfgSnap.MeshGateway.ConsulServers { clusterName := cfgSnap.ServerSNIFn(cfgSnap.Datacenter, srv.Node.Node) - addr, port := srv.BestAddress(false /*wan*/) + _, addr, port := srv.BestAddress(false /*wan*/) lbEndpoint := &envoy_endpoint_v3.LbEndpoint{ HostIdentifier: &envoy_endpoint_v3.LbEndpoint_Endpoint{ @@ -512,7 +512,7 @@ func makeLoadAssignment(clusterName string, endpointGroups []loadAssignmentEndpo for _, ep := range endpoints { // TODO (mesh-gateway) - should we respect the translate_wan_addrs configuration here or just always use the wan for cross-dc? - addr, port := ep.BestAddress(!localKey.Matches(ep.Node.Datacenter, ep.Node.PartitionOrDefault())) + _, addr, port := ep.BestAddress(!localKey.Matches(ep.Node.Datacenter, ep.Node.PartitionOrDefault())) healthStatus, weight := calculateEndpointHealthAndWeight(ep, endpointGroup.OnlyPassing) if endpointGroup.OverrideHealth != envoy_core_v3.HealthStatus_UNKNOWN { diff --git a/agent/xds/listeners.go b/agent/xds/listeners.go index e7e87724b7..bf087780ad 100644 --- a/agent/xds/listeners.go +++ b/agent/xds/listeners.go @@ -218,20 +218,14 @@ func (s *ResourceGenerator) listenersFromSnapshotConnectProxy(cfgSnap *proxycfg. // as opposed to via a virtual IP. var passthroughChains []*envoy_listener_v3.FilterChain - for _, target := range cfgSnap.ConnectProxy.PassthroughUpstreams { - for tid, addrs := range target { + for _, targets := range cfgSnap.ConnectProxy.PassthroughUpstreams { + for tid, addrs := range targets { uid := proxycfg.NewUpstreamIDFromTargetID(tid) sni := connect.ServiceSNI( uid.Name, "", uid.NamespaceOrDefault(), uid.PartitionOrDefault(), cfgSnap.Datacenter, cfgSnap.Roots.TrustDomain) - u := structs.Upstream{ - DestinationName: uid.Name, - DestinationNamespace: uid.NamespaceOrDefault(), - DestinationPartition: uid.PartitionOrDefault(), - } - - filterName := fmt.Sprintf("%s.%s.%s.%s", u.DestinationName, u.DestinationNamespace, u.DestinationPartition, cfgSnap.Datacenter) + filterName := fmt.Sprintf("%s.%s.%s.%s", uid.Name, uid.NamespaceOrDefault(), uid.PartitionOrDefault(), cfgSnap.Datacenter) filterChain, err := s.makeUpstreamFilterChain(filterChainOpts{ clusterName: "passthrough~" + sni, From ceb52d649a6b569195428c5895c12a1ed982a640 Mon Sep 17 00:00:00 2001 From: freddygv Date: Wed, 9 Feb 2022 17:16:00 -0700 Subject: [PATCH 4/5] Account for upstream targets in another DC. Transparent proxies typically cannot dial upstreams in remote datacenters. However, if their upstream configures a redirect to a remote DC then the upstream targets will be in another datacenter. In that sort of case we should use the WAN address for the passthrough. --- agent/proxycfg/state_test.go | 36 ++++++++++++++++++++++-------------- agent/proxycfg/upstreams.go | 5 ++--- 2 files changed, 24 insertions(+), 17 deletions(-) diff --git a/agent/proxycfg/state_test.go b/agent/proxycfg/state_test.go index c80e57f5dc..7e88c6eabc 100644 --- a/agent/proxycfg/state_test.go +++ b/agent/proxycfg/state_test.go @@ -1891,8 +1891,9 @@ func TestState_WatchesAndUpdates(t *testing.T) { Nodes: structs.CheckServiceNodes{ { Node: &structs.Node{ - Node: "node1", - Address: "10.0.0.1", + Datacenter: "dc1", + Node: "node1", + Address: "10.0.0.1", }, Service: &structs.NodeService{ Kind: structs.ServiceKindConnectProxy, @@ -1916,8 +1917,9 @@ func TestState_WatchesAndUpdates(t *testing.T) { }, { Node: &structs.Node{ - Node: "node2", - Address: "10.0.0.2", + Datacenter: "dc1", + Node: "node2", + Address: "10.0.0.2", RaftIndex: structs.RaftIndex{ ModifyIndex: 21, }, @@ -1948,8 +1950,9 @@ func TestState_WatchesAndUpdates(t *testing.T) { structs.CheckServiceNodes{ { Node: &structs.Node{ - Node: "node1", - Address: "10.0.0.1", + Datacenter: "dc1", + Node: "node1", + Address: "10.0.0.1", }, Service: &structs.NodeService{ Kind: structs.ServiceKindConnectProxy, @@ -1973,8 +1976,9 @@ func TestState_WatchesAndUpdates(t *testing.T) { }, { Node: &structs.Node{ - Node: "node2", - Address: "10.0.0.2", + Datacenter: "dc1", + Node: "node2", + Address: "10.0.0.2", RaftIndex: structs.RaftIndex{ ModifyIndex: 21, }, @@ -2065,8 +2069,9 @@ func TestState_WatchesAndUpdates(t *testing.T) { Nodes: structs.CheckServiceNodes{ { Node: &structs.Node{ - Node: "node2", - Address: "10.0.0.2", + Datacenter: "dc1", + Node: "node2", + Address: "10.0.0.2", RaftIndex: structs.RaftIndex{ ModifyIndex: 21, }, @@ -2099,8 +2104,9 @@ func TestState_WatchesAndUpdates(t *testing.T) { structs.CheckServiceNodes{ { Node: &structs.Node{ - Node: "node2", - Address: "10.0.0.2", + Datacenter: "dc1", + Node: "node2", + Address: "10.0.0.2", RaftIndex: structs.RaftIndex{ ModifyIndex: 21, }, @@ -2144,7 +2150,8 @@ func TestState_WatchesAndUpdates(t *testing.T) { Nodes: structs.CheckServiceNodes{ { Node: &structs.Node{ - Node: "node2", + Datacenter: "dc1", + Node: "node2", }, Service: &structs.NodeService{ Kind: structs.ServiceKindConnectProxy, @@ -2178,7 +2185,8 @@ func TestState_WatchesAndUpdates(t *testing.T) { structs.CheckServiceNodes{ { Node: &structs.Node{ - Node: "node2", + Datacenter: "dc1", + Node: "node2", }, Service: &structs.NodeService{ Kind: structs.ServiceKindConnectProxy, diff --git a/agent/proxycfg/upstreams.go b/agent/proxycfg/upstreams.go index 4cd406a42d..e77b554ff9 100644 --- a/agent/proxycfg/upstreams.go +++ b/agent/proxycfg/upstreams.go @@ -111,9 +111,8 @@ func (s *handlerUpstreams) handleUpdateUpstreams(ctx context.Context, u cache.Up continue } - // Make sure to use an external address when crossing partitions. - // Datacenter is not considered because transparent proxies cannot dial other datacenters. - isRemote := !structs.EqualPartitions(node.Node.PartitionOrDefault(), s.proxyID.PartitionOrDefault()) + // Make sure to use an external address when crossing partition or DC boundaries. + isRemote := !snap.Locality.Matches(node.Node.Datacenter, node.Node.PartitionOrDefault()) csnIdx, addr, _ := node.BestAddress(isRemote) existing := upstreamsSnapshot.PassthroughIndices[addr] From 5d882badcb2964120c95201c27aff200d94ec756 Mon Sep 17 00:00:00 2001 From: freddygv Date: Thu, 10 Feb 2022 17:21:34 -0700 Subject: [PATCH 5/5] Add changelog entry --- .changelog/12223.txt | 3 +++ 1 file changed, 3 insertions(+) create mode 100644 .changelog/12223.txt diff --git a/.changelog/12223.txt b/.changelog/12223.txt new file mode 100644 index 0000000000..f7fbcee6e4 --- /dev/null +++ b/.changelog/12223.txt @@ -0,0 +1,3 @@ +```release-note:bug +connect: fixes bug where passthrough addressses for transparent proxies dialed directly weren't being cleaned up. +``` \ No newline at end of file