mirror of https://github.com/hashicorp/consul
Merge pull request #12223 from hashicorp/proxycfg/passthrough-cleanup
commit
9580f79f86
|
@ -0,0 +1,3 @@
|
|||
```release-note:bug
|
||||
connect: fixes bug where passthrough addressses for transparent proxies dialed directly weren't being cleaned up.
|
||||
```
|
|
@ -455,7 +455,7 @@ func retainGateways(full structs.CheckServiceNodes) structs.CheckServiceNodes {
|
|||
func renderGatewayAddrs(gateways structs.CheckServiceNodes, wan bool) []string {
|
||||
out := make([]string, 0, len(gateways))
|
||||
for _, csn := range gateways {
|
||||
addr, port := csn.BestAddress(wan)
|
||||
_, addr, port := csn.BestAddress(wan)
|
||||
completeAddr := ipaddr.FormatAddressPort(addr, port)
|
||||
out = append(out, completeAddr)
|
||||
}
|
||||
|
|
|
@ -27,7 +27,8 @@ func (s *handlerConnectProxy) initialize(ctx context.Context) (ConfigSnapshot, e
|
|||
snap.ConnectProxy.WatchedServiceChecks = make(map[structs.ServiceID][]structs.CheckType)
|
||||
snap.ConnectProxy.PreparedQueryEndpoints = make(map[UpstreamID]structs.CheckServiceNodes)
|
||||
snap.ConnectProxy.UpstreamConfig = make(map[UpstreamID]*structs.Upstream)
|
||||
snap.ConnectProxy.PassthroughUpstreams = make(map[UpstreamID]ServicePassthroughAddrs)
|
||||
snap.ConnectProxy.PassthroughUpstreams = make(map[UpstreamID]map[string]map[string]struct{})
|
||||
snap.ConnectProxy.PassthroughIndices = make(map[string]indexedTarget)
|
||||
|
||||
// Watch for root changes
|
||||
err := s.cache.Notify(ctx, cachetype.ConnectCARootName, &structs.DCSpecificRequest{
|
||||
|
@ -326,6 +327,17 @@ func (s *handlerConnectProxy) handleUpdate(ctx context.Context, u cache.UpdateEv
|
|||
delete(snap.ConnectProxy.WatchedDiscoveryChains, uid)
|
||||
}
|
||||
}
|
||||
for uid := range snap.ConnectProxy.PassthroughUpstreams {
|
||||
if _, ok := seenUpstreams[uid]; !ok {
|
||||
delete(snap.ConnectProxy.PassthroughUpstreams, uid)
|
||||
}
|
||||
}
|
||||
for addr, indexed := range snap.ConnectProxy.PassthroughIndices {
|
||||
if _, ok := seenUpstreams[indexed.upstreamID]; !ok {
|
||||
delete(snap.ConnectProxy.PassthroughIndices, addr)
|
||||
}
|
||||
}
|
||||
|
||||
// These entries are intentionally handled separately from the WatchedDiscoveryChains above.
|
||||
// There have been situations where a discovery watch was cancelled, then fired.
|
||||
// That update event then re-populated the DiscoveryChain map entry, which wouldn't get cleaned up
|
||||
|
|
|
@ -234,7 +234,8 @@ func TestManager_BasicLifecycle(t *testing.T) {
|
|||
NewUpstreamID(&upstreams[1]): &upstreams[1],
|
||||
NewUpstreamID(&upstreams[2]): &upstreams[2],
|
||||
},
|
||||
PassthroughUpstreams: map[UpstreamID]ServicePassthroughAddrs{},
|
||||
PassthroughUpstreams: map[UpstreamID]map[string]map[string]struct{}{},
|
||||
PassthroughIndices: map[string]indexedTarget{},
|
||||
},
|
||||
PreparedQueryEndpoints: map[UpstreamID]structs.CheckServiceNodes{},
|
||||
WatchedServiceChecks: map[structs.ServiceID][]structs.CheckType{},
|
||||
|
@ -292,7 +293,8 @@ func TestManager_BasicLifecycle(t *testing.T) {
|
|||
NewUpstreamID(&upstreams[1]): &upstreams[1],
|
||||
NewUpstreamID(&upstreams[2]): &upstreams[2],
|
||||
},
|
||||
PassthroughUpstreams: map[UpstreamID]ServicePassthroughAddrs{},
|
||||
PassthroughUpstreams: map[UpstreamID]map[string]map[string]struct{}{},
|
||||
PassthroughIndices: map[string]indexedTarget{},
|
||||
},
|
||||
PreparedQueryEndpoints: map[UpstreamID]structs.CheckServiceNodes{},
|
||||
WatchedServiceChecks: map[structs.ServiceID][]structs.CheckType{},
|
||||
|
|
|
@ -45,6 +45,25 @@ func NewUpstreamIDFromServiceID(sid structs.ServiceID) UpstreamID {
|
|||
return id
|
||||
}
|
||||
|
||||
func NewUpstreamIDFromTargetID(tid string) UpstreamID {
|
||||
// Drop the leading subset if one is present in the target ID.
|
||||
separators := strings.Count(tid, ".")
|
||||
if separators > 3 {
|
||||
prefix := tid[:strings.Index(tid, ".")+1]
|
||||
tid = strings.TrimPrefix(tid, prefix)
|
||||
}
|
||||
|
||||
split := strings.SplitN(tid, ".", 4)
|
||||
|
||||
id := UpstreamID{
|
||||
Name: split[0],
|
||||
EnterpriseMeta: structs.NewEnterpriseMetaWithPartition(split[2], split[1]),
|
||||
Datacenter: split[3],
|
||||
}
|
||||
id.normalize()
|
||||
return id
|
||||
}
|
||||
|
||||
func (u *UpstreamID) normalize() {
|
||||
if u.Type == structs.UpstreamDestTypeService {
|
||||
u.Type = ""
|
||||
|
|
|
@ -8,6 +8,43 @@ import (
|
|||
"github.com/hashicorp/consul/agent/structs"
|
||||
)
|
||||
|
||||
// TODO(freddy): Needs enterprise test
|
||||
func TestUpstreamIDFromTargetID(t *testing.T) {
|
||||
type testcase struct {
|
||||
tid string
|
||||
expect UpstreamID
|
||||
}
|
||||
run := func(t *testing.T, tc testcase) {
|
||||
tc.expect.EnterpriseMeta.Normalize()
|
||||
|
||||
got := NewUpstreamIDFromTargetID(tc.tid)
|
||||
require.Equal(t, tc.expect, got)
|
||||
}
|
||||
|
||||
cases := map[string]testcase{
|
||||
"with subset": {
|
||||
tid: "v1.foo.default.default.dc2",
|
||||
expect: UpstreamID{
|
||||
Name: "foo",
|
||||
Datacenter: "dc2",
|
||||
},
|
||||
},
|
||||
"without subset": {
|
||||
tid: "foo.default.default.dc2",
|
||||
expect: UpstreamID{
|
||||
Name: "foo",
|
||||
Datacenter: "dc2",
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
for name, tc := range cases {
|
||||
t.Run(name, func(t *testing.T) {
|
||||
run(t, tc)
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func TestUpstreamIDFromString(t *testing.T) {
|
||||
type testcase struct {
|
||||
id string
|
||||
|
|
|
@ -9,7 +9,6 @@ import (
|
|||
"github.com/mitchellh/copystructure"
|
||||
|
||||
"github.com/hashicorp/consul/acl"
|
||||
"github.com/hashicorp/consul/agent/connect"
|
||||
"github.com/hashicorp/consul/agent/structs"
|
||||
)
|
||||
|
||||
|
@ -52,8 +51,16 @@ type ConfigSnapshotUpstreams struct {
|
|||
// UpstreamConfig is a map to an upstream's configuration.
|
||||
UpstreamConfig map[UpstreamID]*structs.Upstream
|
||||
|
||||
// PassthroughEndpoints is a map of: UpstreamID -> ServicePassthroughAddrs.
|
||||
PassthroughUpstreams map[UpstreamID]ServicePassthroughAddrs
|
||||
// PassthroughEndpoints is a map of: UpstreamID -> (map of TargetID ->
|
||||
// (set of IP addresses)). It contains the upstream endpoints that
|
||||
// can be dialed directly by a transparent proxy.
|
||||
PassthroughUpstreams map[UpstreamID]map[string]map[string]struct{}
|
||||
|
||||
// PassthroughIndices is a map of: address -> indexedTarget.
|
||||
// It is used to track the modify index associated with a passthrough address.
|
||||
// Tracking this index helps break ties when a single address is shared by
|
||||
// more than one upstream due to a race.
|
||||
PassthroughIndices map[string]indexedTarget
|
||||
|
||||
// IntentionUpstreams is a set of upstreams inferred from intentions.
|
||||
//
|
||||
|
@ -61,6 +68,14 @@ type ConfigSnapshotUpstreams struct {
|
|||
IntentionUpstreams map[UpstreamID]struct{}
|
||||
}
|
||||
|
||||
// indexedTarget is used to associate the Raft modify index of a resource
|
||||
// with the corresponding upstream target.
|
||||
type indexedTarget struct {
|
||||
upstreamID UpstreamID
|
||||
targetID string
|
||||
idx uint64
|
||||
}
|
||||
|
||||
type GatewayKey struct {
|
||||
Datacenter string
|
||||
Partition string
|
||||
|
@ -91,18 +106,6 @@ func gatewayKeyFromString(s string) GatewayKey {
|
|||
return GatewayKey{Partition: split[0], Datacenter: split[1]}
|
||||
}
|
||||
|
||||
// ServicePassthroughAddrs contains the LAN addrs
|
||||
type ServicePassthroughAddrs struct {
|
||||
// SNI is the Service SNI of the upstream.
|
||||
SNI string
|
||||
|
||||
// SpiffeID is the SPIFFE ID to use for upstream SAN validation.
|
||||
SpiffeID connect.SpiffeIDService
|
||||
|
||||
// Addrs is a set of the best LAN addresses for the instances of the upstream.
|
||||
Addrs map[string]struct{}
|
||||
}
|
||||
|
||||
type configSnapshotConnectProxy struct {
|
||||
ConfigSnapshotUpstreams
|
||||
|
||||
|
|
|
@ -412,7 +412,7 @@ func hostnameEndpoints(logger hclog.Logger, localKey GatewayKey, nodes structs.C
|
|||
)
|
||||
|
||||
for _, n := range nodes {
|
||||
addr, _ := n.BestAddress(!localKey.Matches(n.Node.Datacenter, n.Node.PartitionOrDefault()))
|
||||
_, addr, _ := n.BestAddress(!localKey.Matches(n.Node.Datacenter, n.Node.PartitionOrDefault()))
|
||||
if net.ParseIP(addr) != nil {
|
||||
hasIP = true
|
||||
continue
|
||||
|
|
|
@ -12,7 +12,6 @@ import (
|
|||
|
||||
"github.com/hashicorp/consul/agent/cache"
|
||||
cachetype "github.com/hashicorp/consul/agent/cache-types"
|
||||
"github.com/hashicorp/consul/agent/connect"
|
||||
"github.com/hashicorp/consul/agent/consul/discoverychain"
|
||||
"github.com/hashicorp/consul/agent/rpcclient/health"
|
||||
"github.com/hashicorp/consul/agent/structs"
|
||||
|
@ -1892,8 +1891,9 @@ func TestState_WatchesAndUpdates(t *testing.T) {
|
|||
Nodes: structs.CheckServiceNodes{
|
||||
{
|
||||
Node: &structs.Node{
|
||||
Node: "node1",
|
||||
Address: "10.0.0.1",
|
||||
Datacenter: "dc1",
|
||||
Node: "node1",
|
||||
Address: "10.0.0.1",
|
||||
},
|
||||
Service: &structs.NodeService{
|
||||
Kind: structs.ServiceKindConnectProxy,
|
||||
|
@ -1910,12 +1910,19 @@ func TestState_WatchesAndUpdates(t *testing.T) {
|
|||
DialedDirectly: true,
|
||||
},
|
||||
},
|
||||
RaftIndex: structs.RaftIndex{
|
||||
ModifyIndex: 12,
|
||||
},
|
||||
},
|
||||
},
|
||||
{
|
||||
Node: &structs.Node{
|
||||
Node: "node2",
|
||||
Address: "10.0.0.2",
|
||||
Datacenter: "dc1",
|
||||
Node: "node2",
|
||||
Address: "10.0.0.2",
|
||||
RaftIndex: structs.RaftIndex{
|
||||
ModifyIndex: 21,
|
||||
},
|
||||
},
|
||||
Service: &structs.NodeService{
|
||||
Kind: structs.ServiceKindConnectProxy,
|
||||
|
@ -1943,8 +1950,9 @@ func TestState_WatchesAndUpdates(t *testing.T) {
|
|||
structs.CheckServiceNodes{
|
||||
{
|
||||
Node: &structs.Node{
|
||||
Node: "node1",
|
||||
Address: "10.0.0.1",
|
||||
Datacenter: "dc1",
|
||||
Node: "node1",
|
||||
Address: "10.0.0.1",
|
||||
},
|
||||
Service: &structs.NodeService{
|
||||
Kind: structs.ServiceKindConnectProxy,
|
||||
|
@ -1961,12 +1969,19 @@ func TestState_WatchesAndUpdates(t *testing.T) {
|
|||
DialedDirectly: true,
|
||||
},
|
||||
},
|
||||
RaftIndex: structs.RaftIndex{
|
||||
ModifyIndex: 12,
|
||||
},
|
||||
},
|
||||
},
|
||||
{
|
||||
Node: &structs.Node{
|
||||
Node: "node2",
|
||||
Address: "10.0.0.2",
|
||||
Datacenter: "dc1",
|
||||
Node: "node2",
|
||||
Address: "10.0.0.2",
|
||||
RaftIndex: structs.RaftIndex{
|
||||
ModifyIndex: 21,
|
||||
},
|
||||
},
|
||||
Service: &structs.NodeService{
|
||||
Kind: structs.ServiceKindConnectProxy,
|
||||
|
@ -1985,22 +2000,26 @@ func TestState_WatchesAndUpdates(t *testing.T) {
|
|||
// The LAN service address is used below because transparent proxying
|
||||
// does not support querying service nodes in other DCs, and the WAN address
|
||||
// should not be used in DC-local calls.
|
||||
require.Equal(t, snap.ConnectProxy.PassthroughUpstreams, map[UpstreamID]ServicePassthroughAddrs{
|
||||
require.Equal(t, snap.ConnectProxy.PassthroughUpstreams, map[UpstreamID]map[string]map[string]struct{}{
|
||||
dbUID: {
|
||||
SNI: connect.ServiceSNI("db", "", structs.IntentionDefaultNamespace, "", snap.Datacenter, snap.Roots.TrustDomain),
|
||||
SpiffeID: connect.SpiffeIDService{
|
||||
Host: snap.Roots.TrustDomain,
|
||||
Namespace: db.NamespaceOrDefault(),
|
||||
Partition: db.PartitionOrDefault(),
|
||||
Datacenter: snap.Datacenter,
|
||||
Service: "db",
|
||||
},
|
||||
Addrs: map[string]struct{}{
|
||||
"db.default.default.dc1": map[string]struct{}{
|
||||
"10.10.10.10": {},
|
||||
"10.0.0.2": {},
|
||||
},
|
||||
},
|
||||
})
|
||||
require.Equal(t, snap.ConnectProxy.PassthroughIndices, map[string]indexedTarget{
|
||||
"10.0.0.2": {
|
||||
upstreamID: dbUID,
|
||||
targetID: "db.default.default.dc1",
|
||||
idx: 21,
|
||||
},
|
||||
"10.10.10.10": {
|
||||
upstreamID: dbUID,
|
||||
targetID: "db.default.default.dc1",
|
||||
idx: 12,
|
||||
},
|
||||
})
|
||||
},
|
||||
},
|
||||
// Discovery chain updates should be stored
|
||||
|
@ -2041,8 +2060,194 @@ func TestState_WatchesAndUpdates(t *testing.T) {
|
|||
require.Contains(t, snap.ConnectProxy.WatchedUpstreams[dbUID], "mysql.default.default.dc1")
|
||||
},
|
||||
},
|
||||
// Empty list of upstreams should clean everything up
|
||||
{
|
||||
// Receive a new upstream target event without proxy1.
|
||||
events: []cache.UpdateEvent{
|
||||
{
|
||||
CorrelationID: "upstream-target:db.default.default.dc1:" + dbUID.String(),
|
||||
Result: &structs.IndexedCheckServiceNodes{
|
||||
Nodes: structs.CheckServiceNodes{
|
||||
{
|
||||
Node: &structs.Node{
|
||||
Datacenter: "dc1",
|
||||
Node: "node2",
|
||||
Address: "10.0.0.2",
|
||||
RaftIndex: structs.RaftIndex{
|
||||
ModifyIndex: 21,
|
||||
},
|
||||
},
|
||||
Service: &structs.NodeService{
|
||||
Kind: structs.ServiceKindConnectProxy,
|
||||
ID: "db-sidecar-proxy2",
|
||||
Service: "db-sidecar-proxy",
|
||||
Proxy: structs.ConnectProxyConfig{
|
||||
DestinationServiceName: "db",
|
||||
TransparentProxy: structs.TransparentProxyConfig{
|
||||
DialedDirectly: true,
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
Err: nil,
|
||||
},
|
||||
},
|
||||
verifySnapshot: func(t testing.TB, snap *ConfigSnapshot) {
|
||||
require.Len(t, snap.ConnectProxy.WatchedUpstreamEndpoints, 1)
|
||||
require.Contains(t, snap.ConnectProxy.WatchedUpstreamEndpoints, dbUID)
|
||||
require.Len(t, snap.ConnectProxy.WatchedUpstreamEndpoints[dbUID], 1)
|
||||
require.Contains(t, snap.ConnectProxy.WatchedUpstreamEndpoints[dbUID], "db.default.default.dc1")
|
||||
|
||||
// THe endpoint and passthrough address for proxy1 should be gone.
|
||||
require.Equal(t, snap.ConnectProxy.WatchedUpstreamEndpoints[dbUID]["db.default.default.dc1"],
|
||||
structs.CheckServiceNodes{
|
||||
{
|
||||
Node: &structs.Node{
|
||||
Datacenter: "dc1",
|
||||
Node: "node2",
|
||||
Address: "10.0.0.2",
|
||||
RaftIndex: structs.RaftIndex{
|
||||
ModifyIndex: 21,
|
||||
},
|
||||
},
|
||||
Service: &structs.NodeService{
|
||||
Kind: structs.ServiceKindConnectProxy,
|
||||
ID: "db-sidecar-proxy2",
|
||||
Service: "db-sidecar-proxy",
|
||||
Proxy: structs.ConnectProxyConfig{
|
||||
DestinationServiceName: "db",
|
||||
TransparentProxy: structs.TransparentProxyConfig{
|
||||
DialedDirectly: true,
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
)
|
||||
require.Equal(t, snap.ConnectProxy.PassthroughUpstreams, map[UpstreamID]map[string]map[string]struct{}{
|
||||
dbUID: {
|
||||
"db.default.default.dc1": map[string]struct{}{
|
||||
"10.0.0.2": {},
|
||||
},
|
||||
},
|
||||
})
|
||||
require.Equal(t, snap.ConnectProxy.PassthroughIndices, map[string]indexedTarget{
|
||||
"10.0.0.2": {
|
||||
upstreamID: dbUID,
|
||||
targetID: "db.default.default.dc1",
|
||||
idx: 21,
|
||||
},
|
||||
})
|
||||
},
|
||||
},
|
||||
{
|
||||
// Receive a new upstream target event with a conflicting passthrough address
|
||||
events: []cache.UpdateEvent{
|
||||
{
|
||||
CorrelationID: "upstream-target:api.default.default.dc1:" + apiUID.String(),
|
||||
Result: &structs.IndexedCheckServiceNodes{
|
||||
Nodes: structs.CheckServiceNodes{
|
||||
{
|
||||
Node: &structs.Node{
|
||||
Datacenter: "dc1",
|
||||
Node: "node2",
|
||||
},
|
||||
Service: &structs.NodeService{
|
||||
Kind: structs.ServiceKindConnectProxy,
|
||||
ID: "api-sidecar-proxy",
|
||||
Service: "api-sidecar-proxy",
|
||||
Address: "10.0.0.2",
|
||||
Proxy: structs.ConnectProxyConfig{
|
||||
DestinationServiceName: "api",
|
||||
TransparentProxy: structs.TransparentProxyConfig{
|
||||
DialedDirectly: true,
|
||||
},
|
||||
},
|
||||
RaftIndex: structs.RaftIndex{
|
||||
ModifyIndex: 32,
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
Err: nil,
|
||||
},
|
||||
},
|
||||
verifySnapshot: func(t testing.TB, snap *ConfigSnapshot) {
|
||||
require.Len(t, snap.ConnectProxy.WatchedUpstreamEndpoints, 2)
|
||||
require.Contains(t, snap.ConnectProxy.WatchedUpstreamEndpoints, apiUID)
|
||||
require.Len(t, snap.ConnectProxy.WatchedUpstreamEndpoints[apiUID], 1)
|
||||
require.Contains(t, snap.ConnectProxy.WatchedUpstreamEndpoints[apiUID], "api.default.default.dc1")
|
||||
|
||||
// THe endpoint and passthrough address for proxy1 should be gone.
|
||||
require.Equal(t, snap.ConnectProxy.WatchedUpstreamEndpoints[apiUID]["api.default.default.dc1"],
|
||||
structs.CheckServiceNodes{
|
||||
{
|
||||
Node: &structs.Node{
|
||||
Datacenter: "dc1",
|
||||
Node: "node2",
|
||||
},
|
||||
Service: &structs.NodeService{
|
||||
Kind: structs.ServiceKindConnectProxy,
|
||||
ID: "api-sidecar-proxy",
|
||||
Service: "api-sidecar-proxy",
|
||||
Address: "10.0.0.2",
|
||||
Proxy: structs.ConnectProxyConfig{
|
||||
DestinationServiceName: "api",
|
||||
TransparentProxy: structs.TransparentProxyConfig{
|
||||
DialedDirectly: true,
|
||||
},
|
||||
},
|
||||
RaftIndex: structs.RaftIndex{
|
||||
ModifyIndex: 32,
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
)
|
||||
require.Equal(t, snap.ConnectProxy.PassthroughUpstreams, map[UpstreamID]map[string]map[string]struct{}{
|
||||
apiUID: {
|
||||
// This target has a higher index so the old passthrough address should be discarded.
|
||||
"api.default.default.dc1": map[string]struct{}{
|
||||
"10.0.0.2": {},
|
||||
},
|
||||
},
|
||||
})
|
||||
require.Equal(t, snap.ConnectProxy.PassthroughIndices, map[string]indexedTarget{
|
||||
"10.0.0.2": {
|
||||
upstreamID: apiUID,
|
||||
targetID: "api.default.default.dc1",
|
||||
idx: 32,
|
||||
},
|
||||
})
|
||||
},
|
||||
},
|
||||
{
|
||||
// Event with no nodes should clean up addrs
|
||||
events: []cache.UpdateEvent{
|
||||
{
|
||||
CorrelationID: "upstream-target:api.default.default.dc1:" + apiUID.String(),
|
||||
Result: &structs.IndexedCheckServiceNodes{
|
||||
Nodes: structs.CheckServiceNodes{},
|
||||
},
|
||||
Err: nil,
|
||||
},
|
||||
},
|
||||
verifySnapshot: func(t testing.TB, snap *ConfigSnapshot) {
|
||||
require.Len(t, snap.ConnectProxy.WatchedUpstreamEndpoints, 2)
|
||||
require.Contains(t, snap.ConnectProxy.WatchedUpstreamEndpoints, apiUID)
|
||||
require.Len(t, snap.ConnectProxy.WatchedUpstreamEndpoints[apiUID], 1)
|
||||
require.Contains(t, snap.ConnectProxy.WatchedUpstreamEndpoints[apiUID], "api.default.default.dc1")
|
||||
|
||||
// The endpoint and passthrough address for proxy1 should be gone.
|
||||
require.Empty(t, snap.ConnectProxy.WatchedUpstreamEndpoints[apiUID]["api.default.default.dc1"])
|
||||
require.Empty(t, snap.ConnectProxy.PassthroughUpstreams[apiUID]["api.default.default.dc1"])
|
||||
require.Empty(t, snap.ConnectProxy.PassthroughIndices)
|
||||
},
|
||||
},
|
||||
{
|
||||
// Empty list of upstreams should clean up map keys
|
||||
requiredWatches: map[string]verifyWatchRequest{
|
||||
rootsWatchID: genVerifyRootsWatch("dc1"),
|
||||
intentionUpstreamsID: genVerifyServiceSpecificRequest(intentionUpstreamsID,
|
||||
|
@ -2070,6 +2275,8 @@ func TestState_WatchesAndUpdates(t *testing.T) {
|
|||
require.Empty(t, snap.ConnectProxy.WatchedGatewayEndpoints)
|
||||
require.Empty(t, snap.ConnectProxy.DiscoveryChain)
|
||||
require.Empty(t, snap.ConnectProxy.IntentionUpstreams)
|
||||
require.Empty(t, snap.ConnectProxy.PassthroughUpstreams)
|
||||
require.Empty(t, snap.ConnectProxy.PassthroughIndices)
|
||||
},
|
||||
},
|
||||
},
|
||||
|
|
|
@ -9,8 +9,6 @@ import (
|
|||
"github.com/mitchellh/mapstructure"
|
||||
|
||||
"github.com/hashicorp/consul/agent/cache"
|
||||
"github.com/hashicorp/consul/agent/connect"
|
||||
|
||||
cachetype "github.com/hashicorp/consul/agent/cache-types"
|
||||
"github.com/hashicorp/consul/agent/structs"
|
||||
)
|
||||
|
@ -92,55 +90,53 @@ func (s *handlerUpstreams) handleUpdateUpstreams(ctx context.Context, u cache.Up
|
|||
}
|
||||
upstreamsSnapshot.WatchedUpstreamEndpoints[uid][targetID] = resp.Nodes
|
||||
|
||||
var passthroughAddrs map[string]ServicePassthroughAddrs
|
||||
if s.kind != structs.ServiceKindConnectProxy || s.proxyCfg.Mode != structs.ProxyModeTransparent {
|
||||
return nil
|
||||
}
|
||||
|
||||
// Clear out this target's existing passthrough upstreams and indices so that they can be repopulated below.
|
||||
if _, ok := upstreamsSnapshot.PassthroughUpstreams[uid]; ok {
|
||||
for addr := range upstreamsSnapshot.PassthroughUpstreams[uid][targetID] {
|
||||
if indexed := upstreamsSnapshot.PassthroughIndices[addr]; indexed.targetID == targetID && indexed.upstreamID == uid {
|
||||
delete(upstreamsSnapshot.PassthroughIndices, addr)
|
||||
}
|
||||
}
|
||||
upstreamsSnapshot.PassthroughUpstreams[uid][targetID] = make(map[string]struct{})
|
||||
}
|
||||
|
||||
passthroughs := make(map[string]struct{})
|
||||
|
||||
for _, node := range resp.Nodes {
|
||||
if snap.Proxy.Mode == structs.ProxyModeTransparent && node.Service.Proxy.TransparentProxy.DialedDirectly {
|
||||
if passthroughAddrs == nil {
|
||||
passthroughAddrs = make(map[string]ServicePassthroughAddrs)
|
||||
}
|
||||
if !node.Service.Proxy.TransparentProxy.DialedDirectly {
|
||||
continue
|
||||
}
|
||||
|
||||
svc := node.Service.CompoundServiceName()
|
||||
// Make sure to use an external address when crossing partition or DC boundaries.
|
||||
isRemote := !snap.Locality.Matches(node.Node.Datacenter, node.Node.PartitionOrDefault())
|
||||
csnIdx, addr, _ := node.BestAddress(isRemote)
|
||||
|
||||
// Overwrite the name if it's a connect proxy (as opposed to Connect native).
|
||||
// We don't reference the proxy name directly for things like SNI, but rather the name
|
||||
// of the destination. The enterprise meta of a proxy will always be the same as that of
|
||||
// the destination service, so that remains intact.
|
||||
if node.Service.Kind == structs.ServiceKindConnectProxy {
|
||||
dst := node.Service.Proxy.DestinationServiceName
|
||||
if dst == "" {
|
||||
dst = node.Service.Proxy.DestinationServiceID
|
||||
}
|
||||
svc.Name = dst
|
||||
}
|
||||
existing := upstreamsSnapshot.PassthroughIndices[addr]
|
||||
if existing.idx > csnIdx {
|
||||
// The last known instance with this address had a higher index so it takes precedence.
|
||||
continue
|
||||
}
|
||||
|
||||
sni := connect.ServiceSNI(svc.Name, "", svc.NamespaceOrDefault(), svc.PartitionOrDefault(), snap.Datacenter, snap.Roots.TrustDomain)
|
||||
// The current instance has a higher Raft index so we ensure the passthrough address is only
|
||||
// associated with this upstream target. Older associations are cleaned up as needed.
|
||||
delete(upstreamsSnapshot.PassthroughUpstreams[existing.upstreamID][existing.targetID], addr)
|
||||
if len(upstreamsSnapshot.PassthroughUpstreams[existing.upstreamID][existing.targetID]) == 0 {
|
||||
delete(upstreamsSnapshot.PassthroughUpstreams[existing.upstreamID], existing.targetID)
|
||||
}
|
||||
if len(upstreamsSnapshot.PassthroughUpstreams[existing.upstreamID]) == 0 {
|
||||
delete(upstreamsSnapshot.PassthroughUpstreams, existing.upstreamID)
|
||||
}
|
||||
|
||||
spiffeID := connect.SpiffeIDService{
|
||||
Host: snap.Roots.TrustDomain,
|
||||
Partition: svc.PartitionOrDefault(),
|
||||
Namespace: svc.NamespaceOrDefault(),
|
||||
Datacenter: snap.Datacenter,
|
||||
Service: svc.Name,
|
||||
}
|
||||
|
||||
svcUID := NewUpstreamIDFromServiceName(svc)
|
||||
if _, ok := upstreamsSnapshot.PassthroughUpstreams[svcUID]; !ok {
|
||||
upstreamsSnapshot.PassthroughUpstreams[svcUID] = ServicePassthroughAddrs{
|
||||
SNI: sni,
|
||||
SpiffeID: spiffeID,
|
||||
|
||||
// Stored in a set because it's possible for these to be duplicated
|
||||
// when the upstream-target is targeted by multiple discovery chains.
|
||||
Addrs: make(map[string]struct{}),
|
||||
}
|
||||
}
|
||||
|
||||
// Make sure to use an external address when crossing partitions.
|
||||
isRemote := !structs.EqualPartitions(svc.PartitionOrDefault(), s.proxyID.PartitionOrDefault())
|
||||
addr, _ := node.BestAddress(isRemote)
|
||||
|
||||
upstreamsSnapshot.PassthroughUpstreams[NewUpstreamIDFromServiceName(svc)].Addrs[addr] = struct{}{}
|
||||
upstreamsSnapshot.PassthroughIndices[addr] = indexedTarget{idx: csnIdx, upstreamID: uid, targetID: targetID}
|
||||
passthroughs[addr] = struct{}{}
|
||||
}
|
||||
if len(passthroughs) > 0 {
|
||||
upstreamsSnapshot.PassthroughUpstreams[uid] = map[string]map[string]struct{}{
|
||||
targetID: passthroughs,
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -1741,7 +1741,7 @@ type CheckServiceNode struct {
|
|||
Checks HealthChecks
|
||||
}
|
||||
|
||||
func (csn *CheckServiceNode) BestAddress(wan bool) (string, int) {
|
||||
func (csn *CheckServiceNode) BestAddress(wan bool) (uint64, string, int) {
|
||||
// TODO (mesh-gateway) needs a test
|
||||
// best address
|
||||
// wan
|
||||
|
@ -1754,12 +1754,14 @@ func (csn *CheckServiceNode) BestAddress(wan bool) (string, int) {
|
|||
// node addr
|
||||
|
||||
addr, port := csn.Service.BestAddress(wan)
|
||||
idx := csn.Service.ModifyIndex
|
||||
|
||||
if addr == "" {
|
||||
addr = csn.Node.BestAddress(wan)
|
||||
idx = csn.Node.ModifyIndex
|
||||
}
|
||||
|
||||
return addr, port
|
||||
return idx, addr, port
|
||||
}
|
||||
|
||||
func (csn *CheckServiceNode) CanRead(authz acl.Authorizer) acl.EnforcementDecision {
|
||||
|
|
|
@ -2105,14 +2105,18 @@ func TestCheckServiceNode_BestAddress(t *testing.T) {
|
|||
input CheckServiceNode
|
||||
lanAddr string
|
||||
lanPort int
|
||||
lanIdx uint64
|
||||
wanAddr string
|
||||
wanPort int
|
||||
wanIdx uint64
|
||||
}
|
||||
|
||||
nodeAddr := "10.1.2.3"
|
||||
nodeWANAddr := "198.18.19.20"
|
||||
nodeIdx := uint64(11)
|
||||
serviceAddr := "10.2.3.4"
|
||||
servicePort := 1234
|
||||
serviceIdx := uint64(22)
|
||||
serviceWANAddr := "198.19.20.21"
|
||||
serviceWANPort := 987
|
||||
|
||||
|
@ -2121,15 +2125,23 @@ func TestCheckServiceNode_BestAddress(t *testing.T) {
|
|||
input: CheckServiceNode{
|
||||
Node: &Node{
|
||||
Address: nodeAddr,
|
||||
RaftIndex: RaftIndex{
|
||||
ModifyIndex: nodeIdx,
|
||||
},
|
||||
},
|
||||
Service: &NodeService{
|
||||
Port: servicePort,
|
||||
RaftIndex: RaftIndex{
|
||||
ModifyIndex: serviceIdx,
|
||||
},
|
||||
},
|
||||
},
|
||||
|
||||
lanAddr: nodeAddr,
|
||||
lanIdx: nodeIdx,
|
||||
lanPort: servicePort,
|
||||
wanAddr: nodeAddr,
|
||||
wanIdx: nodeIdx,
|
||||
wanPort: servicePort,
|
||||
},
|
||||
"node-wan-address": {
|
||||
|
@ -2139,15 +2151,23 @@ func TestCheckServiceNode_BestAddress(t *testing.T) {
|
|||
TaggedAddresses: map[string]string{
|
||||
"wan": nodeWANAddr,
|
||||
},
|
||||
RaftIndex: RaftIndex{
|
||||
ModifyIndex: nodeIdx,
|
||||
},
|
||||
},
|
||||
Service: &NodeService{
|
||||
Port: servicePort,
|
||||
RaftIndex: RaftIndex{
|
||||
ModifyIndex: serviceIdx,
|
||||
},
|
||||
},
|
||||
},
|
||||
|
||||
lanAddr: nodeAddr,
|
||||
lanIdx: nodeIdx,
|
||||
lanPort: servicePort,
|
||||
wanAddr: nodeWANAddr,
|
||||
wanIdx: nodeIdx,
|
||||
wanPort: servicePort,
|
||||
},
|
||||
"service-address": {
|
||||
|
@ -2158,16 +2178,24 @@ func TestCheckServiceNode_BestAddress(t *testing.T) {
|
|||
TaggedAddresses: map[string]string{
|
||||
"wan": nodeWANAddr,
|
||||
},
|
||||
RaftIndex: RaftIndex{
|
||||
ModifyIndex: nodeIdx,
|
||||
},
|
||||
},
|
||||
Service: &NodeService{
|
||||
Address: serviceAddr,
|
||||
Port: servicePort,
|
||||
RaftIndex: RaftIndex{
|
||||
ModifyIndex: serviceIdx,
|
||||
},
|
||||
},
|
||||
},
|
||||
|
||||
lanAddr: serviceAddr,
|
||||
lanIdx: serviceIdx,
|
||||
lanPort: servicePort,
|
||||
wanAddr: serviceAddr,
|
||||
wanIdx: serviceIdx,
|
||||
wanPort: servicePort,
|
||||
},
|
||||
"service-wan-address": {
|
||||
|
@ -2178,6 +2206,9 @@ func TestCheckServiceNode_BestAddress(t *testing.T) {
|
|||
TaggedAddresses: map[string]string{
|
||||
"wan": nodeWANAddr,
|
||||
},
|
||||
RaftIndex: RaftIndex{
|
||||
ModifyIndex: nodeIdx,
|
||||
},
|
||||
},
|
||||
Service: &NodeService{
|
||||
Address: serviceAddr,
|
||||
|
@ -2188,12 +2219,17 @@ func TestCheckServiceNode_BestAddress(t *testing.T) {
|
|||
Port: serviceWANPort,
|
||||
},
|
||||
},
|
||||
RaftIndex: RaftIndex{
|
||||
ModifyIndex: serviceIdx,
|
||||
},
|
||||
},
|
||||
},
|
||||
|
||||
lanAddr: serviceAddr,
|
||||
lanIdx: serviceIdx,
|
||||
lanPort: servicePort,
|
||||
wanAddr: serviceWANAddr,
|
||||
wanIdx: serviceIdx,
|
||||
wanPort: serviceWANPort,
|
||||
},
|
||||
"service-wan-address-default-port": {
|
||||
|
@ -2204,6 +2240,9 @@ func TestCheckServiceNode_BestAddress(t *testing.T) {
|
|||
TaggedAddresses: map[string]string{
|
||||
"wan": nodeWANAddr,
|
||||
},
|
||||
RaftIndex: RaftIndex{
|
||||
ModifyIndex: nodeIdx,
|
||||
},
|
||||
},
|
||||
Service: &NodeService{
|
||||
Address: serviceAddr,
|
||||
|
@ -2214,12 +2253,17 @@ func TestCheckServiceNode_BestAddress(t *testing.T) {
|
|||
Port: 0,
|
||||
},
|
||||
},
|
||||
RaftIndex: RaftIndex{
|
||||
ModifyIndex: serviceIdx,
|
||||
},
|
||||
},
|
||||
},
|
||||
|
||||
lanAddr: serviceAddr,
|
||||
lanIdx: serviceIdx,
|
||||
lanPort: servicePort,
|
||||
wanAddr: serviceWANAddr,
|
||||
wanIdx: serviceIdx,
|
||||
wanPort: servicePort,
|
||||
},
|
||||
"service-wan-address-node-lan": {
|
||||
|
@ -2230,6 +2274,9 @@ func TestCheckServiceNode_BestAddress(t *testing.T) {
|
|||
TaggedAddresses: map[string]string{
|
||||
"wan": nodeWANAddr,
|
||||
},
|
||||
RaftIndex: RaftIndex{
|
||||
ModifyIndex: nodeIdx,
|
||||
},
|
||||
},
|
||||
Service: &NodeService{
|
||||
Port: servicePort,
|
||||
|
@ -2239,12 +2286,17 @@ func TestCheckServiceNode_BestAddress(t *testing.T) {
|
|||
Port: serviceWANPort,
|
||||
},
|
||||
},
|
||||
RaftIndex: RaftIndex{
|
||||
ModifyIndex: serviceIdx,
|
||||
},
|
||||
},
|
||||
},
|
||||
|
||||
lanAddr: nodeAddr,
|
||||
lanIdx: nodeIdx,
|
||||
lanPort: servicePort,
|
||||
wanAddr: serviceWANAddr,
|
||||
wanIdx: serviceIdx,
|
||||
wanPort: serviceWANPort,
|
||||
},
|
||||
}
|
||||
|
@ -2254,13 +2306,15 @@ func TestCheckServiceNode_BestAddress(t *testing.T) {
|
|||
tc := tc
|
||||
t.Run(name, func(t *testing.T) {
|
||||
|
||||
addr, port := tc.input.BestAddress(false)
|
||||
idx, addr, port := tc.input.BestAddress(false)
|
||||
require.Equal(t, tc.lanAddr, addr)
|
||||
require.Equal(t, tc.lanPort, port)
|
||||
require.Equal(t, tc.lanIdx, idx)
|
||||
|
||||
addr, port = tc.input.BestAddress(true)
|
||||
idx, addr, port = tc.input.BestAddress(true)
|
||||
require.Equal(t, tc.wanAddr, addr)
|
||||
require.Equal(t, tc.wanPort, port)
|
||||
require.Equal(t, tc.wanIdx, idx)
|
||||
})
|
||||
}
|
||||
}
|
||||
|
|
|
@ -171,36 +171,51 @@ func makePassthroughClusters(cfgSnap *proxycfg.ConfigSnapshot) ([]proto.Message,
|
|||
})
|
||||
}
|
||||
|
||||
for _, passthrough := range cfgSnap.ConnectProxy.PassthroughUpstreams {
|
||||
// Prefixed with passthrough to distinguish from non-passthrough clusters for the same upstream.
|
||||
name := "passthrough~" + passthrough.SNI
|
||||
for _, target := range cfgSnap.ConnectProxy.PassthroughUpstreams {
|
||||
for tid := range target {
|
||||
uid := proxycfg.NewUpstreamIDFromTargetID(tid)
|
||||
|
||||
c := envoy_cluster_v3.Cluster{
|
||||
Name: name,
|
||||
ClusterDiscoveryType: &envoy_cluster_v3.Cluster_Type{
|
||||
Type: envoy_cluster_v3.Cluster_ORIGINAL_DST,
|
||||
},
|
||||
LbPolicy: envoy_cluster_v3.Cluster_CLUSTER_PROVIDED,
|
||||
sni := connect.ServiceSNI(
|
||||
uid.Name, "", uid.NamespaceOrDefault(), uid.PartitionOrDefault(), cfgSnap.Datacenter, cfgSnap.Roots.TrustDomain)
|
||||
|
||||
// TODO(tproxy) This should use the connection timeout configured on the upstream's config entry
|
||||
ConnectTimeout: ptypes.DurationProto(5 * time.Second),
|
||||
}
|
||||
// Prefixed with passthrough to distinguish from non-passthrough clusters for the same upstream.
|
||||
name := "passthrough~" + sni
|
||||
|
||||
commonTLSContext := makeCommonTLSContextFromLeafWithoutParams(cfgSnap, cfgSnap.Leaf())
|
||||
err := injectSANMatcher(commonTLSContext, passthrough.SpiffeID)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to inject SAN matcher rules for cluster %q: %v", passthrough.SNI, err)
|
||||
c := envoy_cluster_v3.Cluster{
|
||||
Name: name,
|
||||
ClusterDiscoveryType: &envoy_cluster_v3.Cluster_Type{
|
||||
Type: envoy_cluster_v3.Cluster_ORIGINAL_DST,
|
||||
},
|
||||
LbPolicy: envoy_cluster_v3.Cluster_CLUSTER_PROVIDED,
|
||||
|
||||
// TODO(tproxy) This should use the connection timeout configured on the upstream's config entry
|
||||
ConnectTimeout: ptypes.DurationProto(5 * time.Second),
|
||||
}
|
||||
|
||||
spiffeID := connect.SpiffeIDService{
|
||||
Host: cfgSnap.Roots.TrustDomain,
|
||||
Partition: uid.PartitionOrDefault(),
|
||||
Namespace: uid.NamespaceOrDefault(),
|
||||
Datacenter: cfgSnap.Datacenter,
|
||||
Service: uid.Name,
|
||||
}
|
||||
|
||||
commonTLSContext := makeCommonTLSContextFromLeafWithoutParams(cfgSnap, cfgSnap.Leaf())
|
||||
err := injectSANMatcher(commonTLSContext, spiffeID)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to inject SAN matcher rules for cluster %q: %v", sni, err)
|
||||
}
|
||||
tlsContext := envoy_tls_v3.UpstreamTlsContext{
|
||||
CommonTlsContext: commonTLSContext,
|
||||
Sni: sni,
|
||||
}
|
||||
transportSocket, err := makeUpstreamTLSTransportSocket(&tlsContext)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
c.TransportSocket = transportSocket
|
||||
clusters = append(clusters, &c)
|
||||
}
|
||||
tlsContext := envoy_tls_v3.UpstreamTlsContext{
|
||||
CommonTlsContext: commonTLSContext,
|
||||
Sni: passthrough.SNI,
|
||||
}
|
||||
transportSocket, err := makeUpstreamTLSTransportSocket(&tlsContext)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
c.TransportSocket = transportSocket
|
||||
clusters = append(clusters, &c)
|
||||
}
|
||||
|
||||
return clusters, nil
|
||||
|
@ -892,7 +907,7 @@ func (s *ResourceGenerator) makeGatewayCluster(snap *proxycfg.ConfigSnapshot, op
|
|||
fallback *envoy_endpoint_v3.LbEndpoint
|
||||
)
|
||||
for i, e := range opts.hostnameEndpoints {
|
||||
addr, port := e.BestAddress(opts.isRemote)
|
||||
_, addr, port := e.BestAddress(opts.isRemote)
|
||||
uniqueHostnames[addr] = true
|
||||
|
||||
health, weight := calculateEndpointHealthAndWeight(e, opts.onlyPassing)
|
||||
|
|
|
@ -678,28 +678,14 @@ func TestClustersFromSnapshot(t *testing.T) {
|
|||
}
|
||||
|
||||
// We add a passthrough cluster for each upstream service name
|
||||
snap.ConnectProxy.PassthroughUpstreams = map[proxycfg.UpstreamID]proxycfg.ServicePassthroughAddrs{
|
||||
snap.ConnectProxy.PassthroughUpstreams = map[proxycfg.UpstreamID]map[string]map[string]struct{}{
|
||||
kafkaUID: {
|
||||
SNI: "kafka.default.dc1.internal.e5b08d03-bfc3-c870-1833-baddb116e648.consul",
|
||||
SpiffeID: connect.SpiffeIDService{
|
||||
Host: "e5b08d03-bfc3-c870-1833-baddb116e648.consul",
|
||||
Namespace: "default",
|
||||
Datacenter: "dc1",
|
||||
Service: "kafka",
|
||||
},
|
||||
Addrs: map[string]struct{}{
|
||||
"kafka.default.default.dc1": map[string]struct{}{
|
||||
"9.9.9.9": {},
|
||||
},
|
||||
},
|
||||
mongoUID: {
|
||||
SNI: "mongo.default.dc1.internal.e5b08d03-bfc3-c870-1833-baddb116e648.consul",
|
||||
SpiffeID: connect.SpiffeIDService{
|
||||
Host: "e5b08d03-bfc3-c870-1833-baddb116e648.consul",
|
||||
Namespace: "default",
|
||||
Datacenter: "dc1",
|
||||
Service: "mongo",
|
||||
},
|
||||
Addrs: map[string]struct{}{
|
||||
"mongo.default.default.dc1": map[string]struct{}{
|
||||
"10.10.10.10": {},
|
||||
"10.10.10.12": {},
|
||||
},
|
||||
|
|
|
@ -221,7 +221,7 @@ func (s *ResourceGenerator) endpointsFromSnapshotMeshGateway(cfgSnap *proxycfg.C
|
|||
for _, srv := range cfgSnap.MeshGateway.ConsulServers {
|
||||
clusterName := cfgSnap.ServerSNIFn(cfgSnap.Datacenter, srv.Node.Node)
|
||||
|
||||
addr, port := srv.BestAddress(false /*wan*/)
|
||||
_, addr, port := srv.BestAddress(false /*wan*/)
|
||||
|
||||
lbEndpoint := &envoy_endpoint_v3.LbEndpoint{
|
||||
HostIdentifier: &envoy_endpoint_v3.LbEndpoint_Endpoint{
|
||||
|
@ -512,7 +512,7 @@ func makeLoadAssignment(clusterName string, endpointGroups []loadAssignmentEndpo
|
|||
|
||||
for _, ep := range endpoints {
|
||||
// TODO (mesh-gateway) - should we respect the translate_wan_addrs configuration here or just always use the wan for cross-dc?
|
||||
addr, port := ep.BestAddress(!localKey.Matches(ep.Node.Datacenter, ep.Node.PartitionOrDefault()))
|
||||
_, addr, port := ep.BestAddress(!localKey.Matches(ep.Node.Datacenter, ep.Node.PartitionOrDefault()))
|
||||
healthStatus, weight := calculateEndpointHealthAndWeight(ep, endpointGroup.OnlyPassing)
|
||||
|
||||
if endpointGroup.OverrideHealth != envoy_core_v3.HealthStatus_UNKNOWN {
|
||||
|
|
|
@ -218,26 +218,27 @@ func (s *ResourceGenerator) listenersFromSnapshotConnectProxy(cfgSnap *proxycfg.
|
|||
// as opposed to via a virtual IP.
|
||||
var passthroughChains []*envoy_listener_v3.FilterChain
|
||||
|
||||
for uid, passthrough := range cfgSnap.ConnectProxy.PassthroughUpstreams {
|
||||
u := structs.Upstream{
|
||||
DestinationName: uid.Name,
|
||||
DestinationNamespace: uid.NamespaceOrDefault(),
|
||||
DestinationPartition: uid.PartitionOrDefault(),
|
||||
for _, targets := range cfgSnap.ConnectProxy.PassthroughUpstreams {
|
||||
for tid, addrs := range targets {
|
||||
uid := proxycfg.NewUpstreamIDFromTargetID(tid)
|
||||
|
||||
sni := connect.ServiceSNI(
|
||||
uid.Name, "", uid.NamespaceOrDefault(), uid.PartitionOrDefault(), cfgSnap.Datacenter, cfgSnap.Roots.TrustDomain)
|
||||
|
||||
filterName := fmt.Sprintf("%s.%s.%s.%s", uid.Name, uid.NamespaceOrDefault(), uid.PartitionOrDefault(), cfgSnap.Datacenter)
|
||||
|
||||
filterChain, err := s.makeUpstreamFilterChain(filterChainOpts{
|
||||
clusterName: "passthrough~" + sni,
|
||||
filterName: filterName,
|
||||
protocol: "tcp",
|
||||
})
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
filterChain.FilterChainMatch = makeFilterChainMatchFromAddrs(addrs)
|
||||
|
||||
passthroughChains = append(passthroughChains, filterChain)
|
||||
}
|
||||
|
||||
filterName := fmt.Sprintf("%s.%s.%s.%s", u.DestinationName, u.DestinationNamespace, u.DestinationPartition, cfgSnap.Datacenter)
|
||||
|
||||
filterChain, err := s.makeUpstreamFilterChain(filterChainOpts{
|
||||
clusterName: "passthrough~" + passthrough.SNI,
|
||||
filterName: filterName,
|
||||
protocol: "tcp",
|
||||
})
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
filterChain.FilterChainMatch = makeFilterChainMatchFromAddrs(passthrough.Addrs)
|
||||
|
||||
passthroughChains = append(passthroughChains, filterChain)
|
||||
}
|
||||
|
||||
outboundListener.FilterChains = append(outboundListener.FilterChains, passthroughChains...)
|
||||
|
|
|
@ -1211,16 +1211,14 @@ func TestListenersFromSnapshot(t *testing.T) {
|
|||
|
||||
// We add a filter chains for each passthrough service name.
|
||||
// The filter chain will route to a cluster with the same SNI name.
|
||||
snap.ConnectProxy.PassthroughUpstreams = map[proxycfg.UpstreamID]proxycfg.ServicePassthroughAddrs{
|
||||
snap.ConnectProxy.PassthroughUpstreams = map[proxycfg.UpstreamID]map[string]map[string]struct{}{
|
||||
kafkaUID: {
|
||||
SNI: "kafka.default.dc1.internal.e5b08d03-bfc3-c870-1833-baddb116e648.consul",
|
||||
Addrs: map[string]struct{}{
|
||||
"kafka.default.default.dc1": map[string]struct{}{
|
||||
"9.9.9.9": {},
|
||||
},
|
||||
},
|
||||
mongoUID: {
|
||||
SNI: "mongo.default.dc1.internal.e5b08d03-bfc3-c870-1833-baddb116e648.consul",
|
||||
Addrs: map[string]struct{}{
|
||||
"mongo.default.default.dc1": map[string]struct{}{
|
||||
"10.10.10.10": {},
|
||||
"10.10.10.12": {},
|
||||
},
|
||||
|
|
|
@ -206,7 +206,7 @@
|
|||
},
|
||||
{
|
||||
"@type": "type.googleapis.com/envoy.config.cluster.v3.Cluster",
|
||||
"name": "passthrough~kafka.default.dc1.internal.e5b08d03-bfc3-c870-1833-baddb116e648.consul",
|
||||
"name": "passthrough~kafka.default.dc1.internal.11111111-2222-3333-4444-555555555555.consul",
|
||||
"type": "ORIGINAL_DST",
|
||||
"connectTimeout": "5s",
|
||||
"lbPolicy": "CLUSTER_PROVIDED",
|
||||
|
@ -234,18 +234,18 @@
|
|||
},
|
||||
"matchSubjectAltNames": [
|
||||
{
|
||||
"exact": "spiffe://e5b08d03-bfc3-c870-1833-baddb116e648.consul/ns/default/dc/dc1/svc/kafka"
|
||||
"exact": "spiffe://11111111-2222-3333-4444-555555555555.consul/ns/default/dc/dc1/svc/kafka"
|
||||
}
|
||||
]
|
||||
}
|
||||
},
|
||||
"sni": "kafka.default.dc1.internal.e5b08d03-bfc3-c870-1833-baddb116e648.consul"
|
||||
"sni": "kafka.default.dc1.internal.11111111-2222-3333-4444-555555555555.consul"
|
||||
}
|
||||
}
|
||||
},
|
||||
{
|
||||
"@type": "type.googleapis.com/envoy.config.cluster.v3.Cluster",
|
||||
"name": "passthrough~mongo.default.dc1.internal.e5b08d03-bfc3-c870-1833-baddb116e648.consul",
|
||||
"name": "passthrough~mongo.default.dc1.internal.11111111-2222-3333-4444-555555555555.consul",
|
||||
"type": "ORIGINAL_DST",
|
||||
"connectTimeout": "5s",
|
||||
"lbPolicy": "CLUSTER_PROVIDED",
|
||||
|
@ -273,12 +273,12 @@
|
|||
},
|
||||
"matchSubjectAltNames": [
|
||||
{
|
||||
"exact": "spiffe://e5b08d03-bfc3-c870-1833-baddb116e648.consul/ns/default/dc/dc1/svc/mongo"
|
||||
"exact": "spiffe://11111111-2222-3333-4444-555555555555.consul/ns/default/dc/dc1/svc/mongo"
|
||||
}
|
||||
]
|
||||
}
|
||||
},
|
||||
"sni": "mongo.default.dc1.internal.e5b08d03-bfc3-c870-1833-baddb116e648.consul"
|
||||
"sni": "mongo.default.dc1.internal.11111111-2222-3333-4444-555555555555.consul"
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -55,7 +55,7 @@
|
|||
"typedConfig": {
|
||||
"@type": "type.googleapis.com/envoy.extensions.filters.network.tcp_proxy.v3.TcpProxy",
|
||||
"statPrefix": "upstream.mongo.default.default.dc1",
|
||||
"cluster": "passthrough~mongo.default.dc1.internal.e5b08d03-bfc3-c870-1833-baddb116e648.consul"
|
||||
"cluster": "passthrough~mongo.default.dc1.internal.11111111-2222-3333-4444-555555555555.consul"
|
||||
}
|
||||
}
|
||||
]
|
||||
|
@ -95,7 +95,7 @@
|
|||
"typedConfig": {
|
||||
"@type": "type.googleapis.com/envoy.extensions.filters.network.tcp_proxy.v3.TcpProxy",
|
||||
"statPrefix": "upstream.kafka.default.default.dc1",
|
||||
"cluster": "passthrough~kafka.default.dc1.internal.e5b08d03-bfc3-c870-1833-baddb116e648.consul"
|
||||
"cluster": "passthrough~kafka.default.dc1.internal.11111111-2222-3333-4444-555555555555.consul"
|
||||
}
|
||||
}
|
||||
]
|
||||
|
|
Loading…
Reference in New Issue