Add support for transparent proxy in xDS generation

pull/9900/head
Freddy 2021-03-17 22:29:05 -06:00 committed by GitHub
commit fc02bb7969
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
27 changed files with 1783 additions and 256 deletions

3
.changelog/9894.txt Normal file
View File

@ -0,0 +1,3 @@
```release-note:improvement
connect: Add support for transparently proxying traffic through Envoy. [experimental]
```

View File

@ -3708,6 +3708,8 @@ func (a *Agent) registerCache() {
a.cache.RegisterType(cachetype.IntentionMatchName, &cachetype.IntentionMatch{RPC: a})
a.cache.RegisterType(cachetype.IntentionUpstreamsName, &cachetype.IntentionUpstreams{RPC: a})
a.cache.RegisterType(cachetype.CatalogServicesName, &cachetype.CatalogServices{RPC: a})
a.cache.RegisterType(cachetype.HealthServicesName, &cachetype.HealthServices{RPC: a})

View File

@ -0,0 +1,52 @@
package cachetype
import (
"fmt"
"github.com/hashicorp/consul/agent/cache"
"github.com/hashicorp/consul/agent/structs"
)
// Recommended name for registration.
const IntentionUpstreamsName = "intention-upstreams"
// GatewayUpstreams supports fetching upstreams for a given gateway name.
type IntentionUpstreams struct {
RegisterOptionsBlockingRefresh
RPC RPC
}
func (i *IntentionUpstreams) Fetch(opts cache.FetchOptions, req cache.Request) (cache.FetchResult, error) {
var result cache.FetchResult
// The request should be a ServiceSpecificRequest.
reqReal, ok := req.(*structs.ServiceSpecificRequest)
if !ok {
return result, fmt.Errorf(
"Internal cache failure: request wrong type: %T", req)
}
// Lightweight copy this object so that manipulating QueryOptions doesn't race.
dup := *reqReal
reqReal = &dup
// Set the minimum query index to our current index so we block
reqReal.QueryOptions.MinQueryIndex = opts.MinIndex
reqReal.QueryOptions.MaxQueryTime = opts.Timeout
// Always allow stale - there's no point in hitting leader if the request is
// going to be served from cache and end up arbitrarily stale anyway. This
// allows cached service-discover to automatically read scale across all
// servers too.
reqReal.AllowStale = true
// Fetch
var reply structs.IndexedServiceList
if err := i.RPC.RPC("Internal.IntentionUpstreams", reqReal, &reply); err != nil {
return result, err
}
result.Value = &reply
result.Index = reply.QueryMeta.Index
return result, nil
}

View File

@ -0,0 +1,52 @@
package cachetype
import (
"testing"
"time"
"github.com/hashicorp/consul/agent/cache"
"github.com/hashicorp/consul/agent/structs"
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/require"
)
func TestIntentionUpstreams(t *testing.T) {
rpc := TestRPC(t)
typ := &IntentionUpstreams{RPC: rpc}
// Expect the proper RPC call. This also sets the expected value
// since that is return-by-pointer in the arguments.
var resp *structs.IndexedServiceList
rpc.On("RPC", "Internal.IntentionUpstreams", mock.Anything, mock.Anything).Return(nil).
Run(func(args mock.Arguments) {
req := args.Get(1).(*structs.ServiceSpecificRequest)
require.Equal(t, uint64(24), req.QueryOptions.MinQueryIndex)
require.Equal(t, 1*time.Second, req.QueryOptions.MaxQueryTime)
require.True(t, req.AllowStale)
require.Equal(t, "foo", req.ServiceName)
services := structs.ServiceList{
{Name: "foo"},
}
reply := args.Get(2).(*structs.IndexedServiceList)
reply.Services = services
reply.QueryMeta.Index = 48
resp = reply
})
// Fetch
resultA, err := typ.Fetch(cache.FetchOptions{
MinIndex: 24,
Timeout: 1 * time.Second,
}, &structs.ServiceSpecificRequest{
Datacenter: "dc1",
ServiceName: "foo",
})
require.NoError(t, err)
require.Equal(t, cache.FetchResult{
Value: resp,
Index: 48,
}, resultA)
rpc.AssertExpectations(t)
}

View File

@ -1001,6 +1001,9 @@ func (s *Store) IntentionTopology(ws memdb.WatchSet,
}
result := make(structs.ServiceList, 0, len(allServices))
for _, candidate := range allServices {
if candidate.Name == structs.ConsulServiceName {
continue
}
decision, err := s.IntentionDecision(candidate.Name, candidate.NamespaceOrDefault(), intentions, decisionMatchType, defaultDecision, true)
if err != nil {
src, dst := target, candidate

View File

@ -1889,6 +1889,11 @@ func TestStore_IntentionTopology(t *testing.T) {
Address: "127.0.0.1",
}
services := []structs.NodeService{
{
ID: structs.ConsulServiceID,
Service: structs.ConsulServiceName,
EnterpriseMeta: *structs.DefaultEnterpriseMeta(),
},
{
ID: "api-1",
Service: "api",
@ -1960,7 +1965,7 @@ func TestStore_IntentionTopology(t *testing.T) {
target: structs.NewServiceName("web", nil),
downstreams: false,
expect: expect{
idx: 9,
idx: 10,
services: structs.ServiceList{
{
Name: "mysql",
@ -1987,7 +1992,7 @@ func TestStore_IntentionTopology(t *testing.T) {
target: structs.NewServiceName("web", nil),
downstreams: false,
expect: expect{
idx: 9,
idx: 10,
services: structs.ServiceList{
{
Name: "api",
@ -2014,7 +2019,7 @@ func TestStore_IntentionTopology(t *testing.T) {
target: structs.NewServiceName("api", nil),
downstreams: true,
expect: expect{
idx: 9,
idx: 10,
services: structs.ServiceList{
{
Name: "ingress-gateway",
@ -2045,7 +2050,7 @@ func TestStore_IntentionTopology(t *testing.T) {
target: structs.NewServiceName("api", nil),
downstreams: true,
expect: expect{
idx: 9,
idx: 10,
services: structs.ServiceList{
{
Name: "web",
@ -2072,7 +2077,7 @@ func TestStore_IntentionTopology(t *testing.T) {
target: structs.NewServiceName("web", nil),
downstreams: false,
expect: expect{
idx: 9,
idx: 10,
services: structs.ServiceList{
{
Name: "api",
@ -2103,7 +2108,7 @@ func TestStore_IntentionTopology(t *testing.T) {
target: structs.NewServiceName("web", nil),
downstreams: false,
expect: expect{
idx: 9,
idx: 10,
services: structs.ServiceList{},
},
},
@ -2125,7 +2130,7 @@ func TestStore_IntentionTopology(t *testing.T) {
target: structs.NewServiceName("web", nil),
downstreams: false,
expect: expect{
idx: 9,
idx: 10,
services: structs.ServiceList{
{
Name: "api",

View File

@ -1,6 +1,7 @@
package proxycfg
import (
"context"
"path"
"testing"
"time"
@ -106,6 +107,11 @@ func TestManager_BasicLifecycle(t *testing.T) {
},
)
}
upstreams := structs.TestUpstreams(t)
for i := range upstreams {
upstreams[i].DestinationNamespace = structs.IntentionDefaultNamespace
}
webProxy := &structs.NodeService{
Kind: structs.ServiceKindConnectProxy,
ID: "web-sidecar-proxy",
@ -120,7 +126,7 @@ func TestManager_BasicLifecycle(t *testing.T) {
Config: map[string]interface{}{
"foo": "bar",
},
Upstreams: structs.TestUpstreams(t),
Upstreams: upstreams,
},
}
@ -213,7 +219,8 @@ func TestManager_BasicLifecycle(t *testing.T) {
DiscoveryChain: map[string]*structs.CompiledDiscoveryChain{
"db": dbDefaultChain(),
},
WatchedUpstreams: nil, // Clone() clears this out
WatchedDiscoveryChains: map[string]context.CancelFunc{},
WatchedUpstreams: nil, // Clone() clears this out
WatchedUpstreamEndpoints: map[string]map[string]structs.CheckServiceNodes{
"db": {
"db.default.dc1": TestUpstreamNodes(t),
@ -223,6 +230,10 @@ func TestManager_BasicLifecycle(t *testing.T) {
WatchedGatewayEndpoints: map[string]map[string]structs.CheckServiceNodes{
"db": {},
},
UpstreamConfig: map[string]*structs.Upstream{
upstreams[0].Identifier(): &upstreams[0],
upstreams[1].Identifier(): &upstreams[1],
},
},
PreparedQueryEndpoints: map[string]structs.CheckServiceNodes{},
WatchedServiceChecks: map[structs.ServiceID][]structs.CheckType{},
@ -262,7 +273,8 @@ func TestManager_BasicLifecycle(t *testing.T) {
DiscoveryChain: map[string]*structs.CompiledDiscoveryChain{
"db": dbSplitChain(),
},
WatchedUpstreams: nil, // Clone() clears this out
WatchedDiscoveryChains: map[string]context.CancelFunc{},
WatchedUpstreams: nil, // Clone() clears this out
WatchedUpstreamEndpoints: map[string]map[string]structs.CheckServiceNodes{
"db": {
"v1.db.default.dc1": TestUpstreamNodes(t),
@ -273,6 +285,10 @@ func TestManager_BasicLifecycle(t *testing.T) {
WatchedGatewayEndpoints: map[string]map[string]structs.CheckServiceNodes{
"db": {},
},
UpstreamConfig: map[string]*structs.Upstream{
upstreams[0].Identifier(): &upstreams[0],
upstreams[1].Identifier(): &upstreams[1],
},
},
PreparedQueryEndpoints: map[string]structs.CheckServiceNodes{},
WatchedServiceChecks: map[structs.ServiceID][]structs.CheckType{},

View File

@ -18,6 +18,13 @@ type ConfigSnapshotUpstreams struct {
// targeted by this upstream. We then instantiate watches for those targets.
DiscoveryChain map[string]*structs.CompiledDiscoveryChain
// WatchedDiscoveryChains is a map of upstream.Identifier() -> CancelFunc's
// in order to cancel any watches when the proxy's configuration is
// changed. Ingress gateways and transparent proxies need this because
// discovery chain watches are added and removed through the lifecycle
// of a single proxycfg state instance.
WatchedDiscoveryChains map[string]context.CancelFunc
// WatchedUpstreams is a map of upstream.Identifier() -> (map of TargetID ->
// CancelFunc's) in order to cancel any watches when the configuration is
// changed.
@ -36,6 +43,9 @@ type ConfigSnapshotUpstreams struct {
// TargetID -> CheckServiceNodes) and is used to determine the backing
// endpoints of a mesh gateway.
WatchedGatewayEndpoints map[string]map[string]structs.CheckServiceNodes
// UpstreamConfig is a map to an upstream's configuration.
UpstreamConfig map[string]*structs.Upstream
}
type configSnapshotConnectProxy struct {
@ -58,12 +68,14 @@ func (c *configSnapshotConnectProxy) IsEmpty() bool {
return c.Leaf == nil &&
!c.IntentionsSet &&
len(c.DiscoveryChain) == 0 &&
len(c.WatchedDiscoveryChains) == 0 &&
len(c.WatchedUpstreams) == 0 &&
len(c.WatchedUpstreamEndpoints) == 0 &&
len(c.WatchedGateways) == 0 &&
len(c.WatchedGatewayEndpoints) == 0 &&
len(c.WatchedServiceChecks) == 0 &&
len(c.PreparedQueryEndpoints) == 0
len(c.PreparedQueryEndpoints) == 0 &&
len(c.UpstreamConfig) == 0
}
type configSnapshotTerminatingGateway struct {
@ -287,12 +299,6 @@ type configSnapshotIngressGateway struct {
// to. This is constructed from the ingress-gateway config entry, and uses
// the GatewayServices RPC to retrieve them.
Upstreams map[IngressListenerKey]structs.Upstreams
// WatchedDiscoveryChains is a map of upstream.Identifier() -> CancelFunc's
// in order to cancel any watches when the ingress gateway configuration is
// changed. Ingress gateways need this because discovery chain watches are
// added and removed through the lifecycle of single proxycfg.state instance.
WatchedDiscoveryChains map[string]context.CancelFunc
}
func (c *configSnapshotIngressGateway) IsEmpty() bool {
@ -301,7 +307,6 @@ func (c *configSnapshotIngressGateway) IsEmpty() bool {
}
return len(c.Upstreams) == 0 &&
len(c.DiscoveryChain) == 0 &&
len(c.WatchedDiscoveryChains) == 0 &&
len(c.WatchedUpstreams) == 0 &&
len(c.WatchedUpstreamEndpoints) == 0
}

View File

@ -45,6 +45,7 @@ const (
serviceConfigIDPrefix = "service-config:"
serviceResolverIDPrefix = "service-resolver:"
serviceIntentionsIDPrefix = "service-intentions:"
intentionUpstreamsID = "intention-upstreams"
svcChecksWatchIDPrefix = cachetype.ServiceHTTPChecksName + ":"
serviceIDPrefix = string(structs.UpstreamDestTypeService) + ":"
preparedQueryIDPrefix = string(structs.UpstreamDestTypePreparedQuery) + ":"
@ -182,13 +183,14 @@ func newState(ns *structs.NodeService, token string) (*state, error) {
func (s *state) Watch() (<-chan ConfigSnapshot, error) {
s.ctx, s.cancel = context.WithCancel(context.Background())
err := s.initWatches()
snap := s.initialConfigSnapshot()
err := s.initWatches(&snap)
if err != nil {
s.cancel()
return nil, err
}
go s.run()
go s.run(&snap)
return s.snapCh, nil
}
@ -202,10 +204,10 @@ func (s *state) Close() error {
}
// initWatches sets up the watches needed for the particular service
func (s *state) initWatches() error {
func (s *state) initWatches(snap *ConfigSnapshot) error {
switch s.kind {
case structs.ServiceKindConnectProxy:
return s.initWatchesConnectProxy()
return s.initWatchesConnectProxy(snap)
case structs.ServiceKindTerminatingGateway:
return s.initWatchesTerminatingGateway()
case structs.ServiceKindMeshGateway:
@ -250,7 +252,7 @@ func (s *state) watchConnectProxyService(ctx context.Context, correlationId stri
// initWatchesConnectProxy sets up the watches needed based on current proxy registration
// state.
func (s *state) initWatchesConnectProxy() error {
func (s *state) initWatchesConnectProxy(snap *ConfigSnapshot) error {
// Watch for root changes
err := s.cache.Notify(s.ctx, cachetype.ConnectCARootName, &structs.DCSpecificRequest{
Datacenter: s.source.Datacenter,
@ -302,12 +304,38 @@ func (s *state) initWatchesConnectProxy() error {
// default the namespace to the namespace of this proxy service
currentNamespace := s.proxyID.NamespaceOrDefault()
if s.proxyCfg.TransparentProxy {
// When in transparent proxy we will infer upstreams from intentions with this source
err := s.cache.Notify(s.ctx, cachetype.IntentionUpstreamsName, &structs.ServiceSpecificRequest{
Datacenter: s.source.Datacenter,
QueryOptions: structs.QueryOptions{Token: s.token},
ServiceName: s.proxyCfg.DestinationServiceName,
EnterpriseMeta: structs.NewEnterpriseMeta(s.proxyID.NamespaceOrEmpty()),
}, intentionUpstreamsID, s.ch)
if err != nil {
return err
}
}
// Watch for updates to service endpoints for all upstreams
for _, u := range s.proxyCfg.Upstreams {
for i := range s.proxyCfg.Upstreams {
u := s.proxyCfg.Upstreams[i]
// This can be true if the upstream is a synthetic entry populated from centralized upstream config.
// Watches should not be created for them.
if u.CentrallyConfigured {
continue
}
snap.ConnectProxy.UpstreamConfig[u.Identifier()] = &u
dc := s.source.Datacenter
if u.Datacenter != "" {
dc = u.Datacenter
}
if s.proxyCfg.TransparentProxy && (dc == "" || dc == s.source.Datacenter) {
// In TransparentProxy mode, watches for upstreams in the local DC are handled by the IntentionUpstreams watch.
continue
}
ns := currentNamespace
if u.DestinationNamespace != "" {
@ -548,12 +576,14 @@ func (s *state) initialConfigSnapshot() ConfigSnapshot {
switch s.kind {
case structs.ServiceKindConnectProxy:
snap.ConnectProxy.DiscoveryChain = make(map[string]*structs.CompiledDiscoveryChain)
snap.ConnectProxy.WatchedDiscoveryChains = make(map[string]context.CancelFunc)
snap.ConnectProxy.WatchedUpstreams = make(map[string]map[string]context.CancelFunc)
snap.ConnectProxy.WatchedUpstreamEndpoints = make(map[string]map[string]structs.CheckServiceNodes)
snap.ConnectProxy.WatchedGateways = make(map[string]map[string]context.CancelFunc)
snap.ConnectProxy.WatchedGatewayEndpoints = make(map[string]map[string]structs.CheckServiceNodes)
snap.ConnectProxy.WatchedServiceChecks = make(map[structs.ServiceID][]structs.CheckType)
snap.ConnectProxy.PreparedQueryEndpoints = make(map[string]structs.CheckServiceNodes)
snap.ConnectProxy.UpstreamConfig = make(map[string]*structs.Upstream)
case structs.ServiceKindTerminatingGateway:
snap.TerminatingGateway.WatchedServices = make(map[structs.ServiceName]context.CancelFunc)
snap.TerminatingGateway.WatchedIntentions = make(map[structs.ServiceName]context.CancelFunc)
@ -589,15 +619,13 @@ func (s *state) initialConfigSnapshot() ConfigSnapshot {
return snap
}
func (s *state) run() {
func (s *state) run(snap *ConfigSnapshot) {
// Close the channel we return from Watch when we stop so consumers can stop
// watching and clean up their goroutines. It's important we do this here and
// not in Close since this routine sends on this chan and so might panic if it
// gets closed from another goroutine.
defer close(s.snapCh)
snap := s.initialConfigSnapshot()
// This turns out to be really fiddly/painful by just using time.Timer.C
// directly in the code below since you can't detect when a timer is stopped
// vs waiting in order to know to reset it. So just use a chan to send
@ -612,7 +640,7 @@ func (s *state) run() {
case u := <-s.ch:
s.logger.Trace("A blocking query returned; handling snapshot update")
if err := s.handleUpdate(u, &snap); err != nil {
if err := s.handleUpdate(u, snap); err != nil {
s.logger.Error("Failed to handle update from watch",
"id", u.CorrelationID, "error", err,
)
@ -741,6 +769,68 @@ func (s *state) handleUpdateConnectProxy(u cache.UpdateEvent, snap *ConfigSnapsh
}
snap.ConnectProxy.IntentionsSet = true
case u.CorrelationID == intentionUpstreamsID:
resp, ok := u.Result.(*structs.IndexedServiceList)
if !ok {
return fmt.Errorf("invalid type for response %T", u.Result)
}
seenServices := make(map[string]struct{})
for _, svc := range resp.Services {
seenServices[svc.String()] = struct{}{}
cfgMap := make(map[string]interface{})
u, ok := snap.ConnectProxy.UpstreamConfig[svc.String()]
if ok {
cfgMap = u.Config
}
cfg, err := parseReducedUpstreamConfig(cfgMap)
if err != nil {
// Don't hard fail on a config typo, just warn. We'll fall back on
// the plain discovery chain if there is an error so it's safe to
// continue.
s.logger.Warn("failed to parse upstream config",
"upstream", u.Identifier(),
"error", err,
)
}
err = s.watchDiscoveryChain(snap, cfg, svc.String(), svc.Name, svc.NamespaceOrDefault())
if err != nil {
return fmt.Errorf("failed to watch discovery chain for %s: %v", svc.String(), err)
}
}
// Clean up data from services that were not in the update
for sn := range snap.ConnectProxy.WatchedUpstreams {
if _, ok := seenServices[sn]; !ok {
delete(snap.ConnectProxy.WatchedUpstreams, sn)
}
}
for sn := range snap.ConnectProxy.WatchedUpstreamEndpoints {
if _, ok := seenServices[sn]; !ok {
delete(snap.ConnectProxy.WatchedUpstreamEndpoints, sn)
}
}
for sn := range snap.ConnectProxy.WatchedGateways {
if _, ok := seenServices[sn]; !ok {
delete(snap.ConnectProxy.WatchedGateways, sn)
}
}
for sn := range snap.ConnectProxy.WatchedGatewayEndpoints {
if _, ok := seenServices[sn]; !ok {
delete(snap.ConnectProxy.WatchedGatewayEndpoints, sn)
}
}
for sn, cancelFn := range snap.ConnectProxy.WatchedDiscoveryChains {
if _, ok := seenServices[sn]; !ok {
cancelFn()
delete(snap.ConnectProxy.WatchedDiscoveryChains, sn)
delete(snap.ConnectProxy.DiscoveryChain, sn)
}
}
case strings.HasPrefix(u.CorrelationID, "upstream:"+preparedQueryIDPrefix):
resp, ok := u.Result.(*structs.PreparedQueryExecuteResponse)
if !ok {
@ -1472,9 +1562,9 @@ func (s *state) handleUpdateIngressGateway(u cache.UpdateEvent, snap *ConfigSnap
for _, service := range services.Services {
u := makeUpstream(service)
err := s.watchIngressDiscoveryChain(snap, u)
err := s.watchDiscoveryChain(snap, reducedUpstreamConfig{}, u.Identifier(), u.DestinationName, u.DestinationNamespace)
if err != nil {
return err
return fmt.Errorf("failed to watch discovery chain for %s: %v", u.Identifier(), err)
}
watchedSvcs[u.Identifier()] = struct{}{}
@ -1522,25 +1612,36 @@ func makeUpstream(g *structs.GatewayService) structs.Upstream {
return upstream
}
func (s *state) watchIngressDiscoveryChain(snap *ConfigSnapshot, u structs.Upstream) error {
if _, ok := snap.IngressGateway.WatchedDiscoveryChains[u.Identifier()]; ok {
func (s *state) watchDiscoveryChain(snap *ConfigSnapshot, cfg reducedUpstreamConfig, id, name, namespace string) error {
if _, ok := snap.ConnectProxy.WatchedDiscoveryChains[id]; ok {
return nil
}
ctx, cancel := context.WithCancel(s.ctx)
err := s.cache.Notify(ctx, cachetype.CompiledDiscoveryChainName, &structs.DiscoveryChainRequest{
Datacenter: s.source.Datacenter,
QueryOptions: structs.QueryOptions{Token: s.token},
Name: u.DestinationName,
EvaluateInDatacenter: s.source.Datacenter,
EvaluateInNamespace: u.DestinationNamespace,
}, "discovery-chain:"+u.Identifier(), s.ch)
Datacenter: s.source.Datacenter,
QueryOptions: structs.QueryOptions{Token: s.token},
Name: name,
EvaluateInDatacenter: s.source.Datacenter,
EvaluateInNamespace: namespace,
OverrideProtocol: cfg.Protocol,
OverrideConnectTimeout: cfg.ConnectTimeout(),
}, "discovery-chain:"+id, s.ch)
if err != nil {
cancel()
return err
}
snap.IngressGateway.WatchedDiscoveryChains[u.Identifier()] = cancel
switch s.kind {
case structs.ServiceKindIngressGateway:
snap.IngressGateway.WatchedDiscoveryChains[id] = cancel
case structs.ServiceKindConnectProxy:
snap.ConnectProxy.WatchedDiscoveryChains[id] = cancel
default:
cancel()
return fmt.Errorf("unsupported kind %s", s.kind)
}
return nil
}

View File

@ -255,6 +255,17 @@ func genVerifyIntentionWatch(expectedService string, expectedDatacenter string)
}
}
func genVerifyIntentionUpstreamsWatch(expectedService string, expectedDatacenter string) verifyWatchRequest {
return func(t testing.TB, cacheType string, request cache.Request) {
require.Equal(t, cachetype.IntentionUpstreamsName, cacheType)
reqReal, ok := request.(*structs.ServiceSpecificRequest)
require.True(t, ok)
require.Equal(t, expectedDatacenter, reqReal.Datacenter)
require.Equal(t, expectedService, reqReal.ServiceName)
}
}
func genVerifyPreparedQueryWatch(expectedName string, expectedDatacenter string) verifyWatchRequest {
return func(t testing.TB, cacheType string, request cache.Request) {
require.Equal(t, cachetype.PreparedQueryName, cacheType)
@ -1509,6 +1520,247 @@ func TestState_WatchesAndUpdates(t *testing.T) {
},
},
},
"transparent-proxy-initial": {
ns: structs.NodeService{
Kind: structs.ServiceKindConnectProxy,
ID: "api-proxy",
Service: "api-proxy",
Address: "10.0.1.1",
Proxy: structs.ConnectProxyConfig{
DestinationServiceName: "api",
TransparentProxy: true,
},
},
sourceDC: "dc1",
stages: []verificationStage{
{
requiredWatches: map[string]verifyWatchRequest{
rootsWatchID: genVerifyRootsWatch("dc1"),
intentionUpstreamsID: genVerifyServiceSpecificRequest(intentionUpstreamsID,
"api", "", "dc1", false),
leafWatchID: genVerifyLeafWatch("api", "dc1"),
intentionsWatchID: genVerifyIntentionWatch("api", "dc1"),
},
verifySnapshot: func(t testing.TB, snap *ConfigSnapshot) {
require.False(t, snap.Valid(), "proxy without roots/leaf/intentions is not valid")
require.True(t, snap.ConnectProxy.IsEmpty())
require.True(t, snap.MeshGateway.IsEmpty())
require.True(t, snap.IngressGateway.IsEmpty())
require.True(t, snap.TerminatingGateway.IsEmpty())
},
},
{
events: []cache.UpdateEvent{
rootWatchEvent(),
{
CorrelationID: leafWatchID,
Result: issuedCert,
Err: nil,
},
{
CorrelationID: intentionsWatchID,
Result: TestIntentions(),
Err: nil,
},
},
verifySnapshot: func(t testing.TB, snap *ConfigSnapshot) {
require.True(t, snap.Valid(), "proxy with roots/leaf/intentions is valid")
require.Equal(t, indexedRoots, snap.Roots)
require.Equal(t, issuedCert, snap.Leaf())
require.Equal(t, TestIntentions().Matches[0], snap.ConnectProxy.Intentions)
require.True(t, snap.MeshGateway.IsEmpty())
require.True(t, snap.IngressGateway.IsEmpty())
require.True(t, snap.TerminatingGateway.IsEmpty())
},
},
},
},
"transparent-proxy-handle-update": {
ns: structs.NodeService{
Kind: structs.ServiceKindConnectProxy,
ID: "api-proxy",
Service: "api-proxy",
Address: "10.0.1.1",
Proxy: structs.ConnectProxyConfig{
DestinationServiceName: "api",
TransparentProxy: true,
},
},
sourceDC: "dc1",
stages: []verificationStage{
// Empty on initialization
{
requiredWatches: map[string]verifyWatchRequest{
rootsWatchID: genVerifyRootsWatch("dc1"),
intentionUpstreamsID: genVerifyServiceSpecificRequest(intentionUpstreamsID,
"api", "", "dc1", false),
leafWatchID: genVerifyLeafWatch("api", "dc1"),
intentionsWatchID: genVerifyIntentionWatch("api", "dc1"),
},
verifySnapshot: func(t testing.TB, snap *ConfigSnapshot) {
require.False(t, snap.Valid(), "proxy without roots/leaf/intentions is not valid")
require.True(t, snap.ConnectProxy.IsEmpty())
require.True(t, snap.MeshGateway.IsEmpty())
require.True(t, snap.IngressGateway.IsEmpty())
require.True(t, snap.TerminatingGateway.IsEmpty())
},
},
// Valid snapshot after roots, leaf, and intentions
{
events: []cache.UpdateEvent{
rootWatchEvent(),
{
CorrelationID: leafWatchID,
Result: issuedCert,
Err: nil,
},
{
CorrelationID: intentionsWatchID,
Result: TestIntentions(),
Err: nil,
},
},
verifySnapshot: func(t testing.TB, snap *ConfigSnapshot) {
require.True(t, snap.Valid(), "proxy with roots/leaf/intentions is valid")
require.Equal(t, indexedRoots, snap.Roots)
require.Equal(t, issuedCert, snap.Leaf())
require.Equal(t, TestIntentions().Matches[0], snap.ConnectProxy.Intentions)
require.True(t, snap.MeshGateway.IsEmpty())
require.True(t, snap.IngressGateway.IsEmpty())
require.True(t, snap.TerminatingGateway.IsEmpty())
},
},
// Receiving an intention should lead to spinning up a discovery chain watch
{
requiredWatches: map[string]verifyWatchRequest{
rootsWatchID: genVerifyRootsWatch("dc1"),
intentionUpstreamsID: genVerifyServiceSpecificRequest(intentionUpstreamsID,
"api", "", "dc1", false),
leafWatchID: genVerifyLeafWatch("api", "dc1"),
intentionsWatchID: genVerifyIntentionWatch("api", "dc1"),
},
events: []cache.UpdateEvent{
{
CorrelationID: intentionUpstreamsID,
Result: &structs.IndexedServiceList{
Services: structs.ServiceList{
db,
},
},
Err: nil,
},
},
verifySnapshot: func(t testing.TB, snap *ConfigSnapshot) {
require.True(t, snap.Valid(), "should still be valid")
// Should start watch for db's chain
require.Contains(t, snap.ConnectProxy.WatchedDiscoveryChains, dbStr)
// Should not have results yet
require.Empty(t, snap.ConnectProxy.DiscoveryChain)
},
},
// Discovery chain updates should be stored
{
requiredWatches: map[string]verifyWatchRequest{
"discovery-chain:" + dbStr: genVerifyDiscoveryChainWatch(&structs.DiscoveryChainRequest{
Name: dbStr,
EvaluateInDatacenter: "dc1",
EvaluateInNamespace: "default",
Datacenter: "dc1",
}),
},
events: []cache.UpdateEvent{
{
CorrelationID: "discovery-chain:" + dbStr,
Result: &structs.DiscoveryChainResponse{
Chain: discoverychain.TestCompileConfigEntries(t, "db", "default", "dc1", "trustdomain.consul", "dc1", nil),
},
Err: nil,
},
},
verifySnapshot: func(t testing.TB, snap *ConfigSnapshot) {
require.Len(t, snap.ConnectProxy.WatchedUpstreams, 1)
require.Len(t, snap.ConnectProxy.WatchedUpstreams[dbStr], 1)
},
},
{
requiredWatches: map[string]verifyWatchRequest{
"upstream-target:db.default.dc1:db": genVerifyServiceWatch("db", "", "dc1", true),
},
events: []cache.UpdateEvent{
{
CorrelationID: "upstream-target:db.default.dc1:db",
Result: &structs.IndexedCheckServiceNodes{
Nodes: structs.CheckServiceNodes{
{
Node: &structs.Node{
Node: "node1",
Address: "127.0.0.1",
},
Service: &structs.NodeService{
ID: "db1",
Service: "db",
},
},
},
},
Err: nil,
},
},
verifySnapshot: func(t testing.TB, snap *ConfigSnapshot) {
require.Len(t, snap.ConnectProxy.WatchedUpstreamEndpoints, 1)
require.Contains(t, snap.ConnectProxy.WatchedUpstreamEndpoints, dbStr)
require.Len(t, snap.ConnectProxy.WatchedUpstreamEndpoints[dbStr], 1)
require.Contains(t, snap.ConnectProxy.WatchedUpstreamEndpoints[dbStr], "db.default.dc1")
require.Equal(t, snap.ConnectProxy.WatchedUpstreamEndpoints[dbStr]["db.default.dc1"],
structs.CheckServiceNodes{
{
Node: &structs.Node{
Node: "node1",
Address: "127.0.0.1",
},
Service: &structs.NodeService{
ID: "db1",
Service: "db",
},
},
},
)
},
},
// Empty list of upstreams should clean everything up
{
requiredWatches: map[string]verifyWatchRequest{
rootsWatchID: genVerifyRootsWatch("dc1"),
intentionUpstreamsID: genVerifyServiceSpecificRequest(intentionUpstreamsID,
"api", "", "dc1", false),
leafWatchID: genVerifyLeafWatch("api", "dc1"),
intentionsWatchID: genVerifyIntentionWatch("api", "dc1"),
},
events: []cache.UpdateEvent{
{
CorrelationID: intentionUpstreamsID,
Result: &structs.IndexedServiceList{
Services: structs.ServiceList{},
},
Err: nil,
},
},
verifySnapshot: func(t testing.TB, snap *ConfigSnapshot) {
require.True(t, snap.Valid(), "should still be valid")
// Empty intention upstreams leads to cancelling all associated watches
require.Empty(t, snap.ConnectProxy.WatchedDiscoveryChains)
require.Empty(t, snap.ConnectProxy.WatchedUpstreams)
require.Empty(t, snap.ConnectProxy.WatchedUpstreamEndpoints)
require.Empty(t, snap.ConnectProxy.WatchedGateways)
require.Empty(t, snap.ConnectProxy.WatchedGatewayEndpoints)
require.Empty(t, snap.ConnectProxy.DiscoveryChain)
},
},
},
},
"connect-proxy": newConnectProxyCase(structs.MeshGatewayModeDefault),
"connect-proxy-mesh-gateway-local": newConnectProxyCase(structs.MeshGatewayModeLocal),
}
@ -1542,12 +1794,12 @@ func TestState_WatchesAndUpdates(t *testing.T) {
// setup the ctx as initWatches expects this to be there
state.ctx, state.cancel = context.WithCancel(context.Background())
// ensure the initial watch setup did not error
require.NoError(t, state.initWatches())
// get the initial configuration snapshot
snap := state.initialConfigSnapshot()
// ensure the initial watch setup did not error
require.NoError(t, state.initWatches(&snap))
//--------------------------------------------------------------------
//
// All the nested subtests here are to make failures easier to

View File

@ -653,6 +653,8 @@ func TestConfigSnapshot(t testing.T) *ConfigSnapshot {
t, "db", "default", "dc1",
connect.TestClusterID+".consul", "dc1", nil)
upstreams := structs.TestUpstreams(t)
return &ConfigSnapshot{
Kind: structs.ServiceKindConnectProxy,
Service: "web-sidecar-proxy",
@ -667,12 +669,13 @@ func TestConfigSnapshot(t testing.T) *ConfigSnapshot {
Config: map[string]interface{}{
"foo": "bar",
},
Upstreams: structs.TestUpstreams(t),
Upstreams: upstreams,
},
Roots: roots,
ConnectProxy: configSnapshotConnectProxy{
ConfigSnapshotUpstreams: ConfigSnapshotUpstreams{
Leaf: leaf,
Leaf: leaf,
UpstreamConfig: upstreams.ToMap(),
DiscoveryChain: map[string]*structs.CompiledDiscoveryChain{
"db": dbChain,
},
@ -1315,6 +1318,7 @@ func setupTestVariationConfigEntriesAndSnapshot(
dbChain := discoverychain.TestCompileConfigEntries(t, "db", "default", "dc1", connect.TestClusterID+".consul", "dc1", compileSetup, entries...)
upstreams := structs.TestUpstreams(t)
snap := ConfigSnapshotUpstreams{
Leaf: leaf,
DiscoveryChain: map[string]*structs.CompiledDiscoveryChain{
@ -1325,6 +1329,7 @@ func setupTestVariationConfigEntriesAndSnapshot(
"db.default.dc1": TestUpstreamNodes(t),
},
},
UpstreamConfig: upstreams.ToMap(),
}
switch variation {

View File

@ -384,7 +384,7 @@ func mergeServiceConfig(defaults *structs.ServiceConfigResponse, service *struct
remoteUpstreams := make(map[structs.ServiceID]structs.Upstream)
for _, us := range defaults.UpstreamIDConfigs {
parsed, err := structs.ParseUpstreamConfig(us.Config)
parsed, err := structs.ParseUpstreamConfigNoDefaults(us.Config)
if err != nil {
return nil, fmt.Errorf("failed to parse upstream config map for %s: %v", us.Upstream.String(), err)
}

View File

@ -701,8 +701,8 @@ func (cfg UpstreamConfig) MergeInto(dst map[string]interface{}) {
func (cfg *UpstreamConfig) Normalize() {
cfg.Protocol = strings.ToLower(cfg.Protocol)
if cfg.ConnectTimeoutMs < 1 {
cfg.ConnectTimeoutMs = 5000
if cfg.ConnectTimeoutMs < 0 {
cfg.ConnectTimeoutMs = 0
}
}
@ -744,6 +744,8 @@ func ParseUpstreamConfigNoDefaults(m map[string]interface{}) (UpstreamConfig, er
}
err = decoder.Decode(m)
cfg.Normalize()
return cfg, err
}
@ -753,8 +755,13 @@ func ParseUpstreamConfigNoDefaults(m map[string]interface{}) (UpstreamConfig, er
func ParseUpstreamConfig(m map[string]interface{}) (UpstreamConfig, error) {
cfg, err := ParseUpstreamConfigNoDefaults(m)
// Set defaults (even if error is returned)
cfg.Normalize()
// Set default (even if error is returned)
if cfg.Protocol == "" {
cfg.Protocol = "tcp"
}
if cfg.ConnectTimeoutMs == 0 {
cfg.ConnectTimeoutMs = 5000
}
return cfg, err
}

View File

@ -1572,14 +1572,14 @@ func TestServiceConfigEntry_Normalize(t *testing.T) {
UpstreamConfigs: map[string]*UpstreamConfig{
"redis": {
Protocol: "tcp",
ConnectTimeoutMs: 5000,
ConnectTimeoutMs: 0,
},
"memcached": {
ConnectTimeoutMs: 5000,
ConnectTimeoutMs: 0,
},
},
UpstreamDefaults: &UpstreamConfig{
ConnectTimeoutMs: 5000,
ConnectTimeoutMs: 0,
},
},
},
@ -1751,6 +1751,7 @@ func TestParseUpstreamConfig(t *testing.T) {
input: nil,
want: UpstreamConfig{
ConnectTimeoutMs: 5000,
Protocol: "tcp",
},
},
{
@ -1758,6 +1759,7 @@ func TestParseUpstreamConfig(t *testing.T) {
input: map[string]interface{}{},
want: UpstreamConfig{
ConnectTimeoutMs: 5000,
Protocol: "tcp",
},
},
{
@ -1768,6 +1770,7 @@ func TestParseUpstreamConfig(t *testing.T) {
},
want: UpstreamConfig{
ConnectTimeoutMs: 5000,
Protocol: "tcp",
},
},
{
@ -1787,6 +1790,7 @@ func TestParseUpstreamConfig(t *testing.T) {
},
want: UpstreamConfig{
ConnectTimeoutMs: 1000,
Protocol: "tcp",
},
},
{
@ -1796,6 +1800,7 @@ func TestParseUpstreamConfig(t *testing.T) {
},
want: UpstreamConfig{
ConnectTimeoutMs: 1000,
Protocol: "tcp",
},
},
{
@ -1805,6 +1810,7 @@ func TestParseUpstreamConfig(t *testing.T) {
},
want: UpstreamConfig{
ConnectTimeoutMs: 1000,
Protocol: "tcp",
},
},
{
@ -1823,6 +1829,7 @@ func TestParseUpstreamConfig(t *testing.T) {
MaxPendingRequests: intPointer(60),
MaxConcurrentRequests: intPointer(70),
},
Protocol: "tcp",
},
},
{
@ -1841,6 +1848,7 @@ func TestParseUpstreamConfig(t *testing.T) {
MaxPendingRequests: intPointer(0),
MaxConcurrentRequests: intPointer(0),
},
Protocol: "tcp",
},
},
{
@ -1857,6 +1865,7 @@ func TestParseUpstreamConfig(t *testing.T) {
Interval: 22 * time.Second,
MaxFailures: 7,
},
Protocol: "tcp",
},
},
{
@ -1871,6 +1880,7 @@ func TestParseUpstreamConfig(t *testing.T) {
MeshGateway: MeshGatewayConfig{
Mode: MeshGatewayModeRemote,
},
Protocol: "tcp",
},
},
}

View File

@ -215,6 +215,15 @@ func (us Upstreams) ToAPI() []api.Upstream {
return a
}
func (us Upstreams) ToMap() map[string]*Upstream {
upstreamMap := make(map[string]*Upstream)
for i := range us {
upstreamMap[us[i].Identifier()] = &us[i]
}
return upstreamMap
}
// UpstreamsFromAPI is a helper for converting api.Upstream to Upstream.
func UpstreamsFromAPI(us []api.Upstream) Upstreams {
a := make([]Upstream, len(us))

View File

@ -46,44 +46,57 @@ func (s *Server) clustersFromSnapshot(_ connectionInfo, cfgSnap *proxycfg.Config
// clustersFromSnapshot returns the xDS API representation of the "clusters"
// (upstreams) in the snapshot.
func (s *Server) clustersFromSnapshotConnectProxy(cfgSnap *proxycfg.ConfigSnapshot) ([]proto.Message, error) {
// TODO(rb): this sizing is a low bound.
clusters := make([]proto.Message, 0, len(cfgSnap.Proxy.Upstreams)+1)
// This sizing is a lower bound.
clusters := make([]proto.Message, 0, len(cfgSnap.ConnectProxy.DiscoveryChain)+1)
// Include the "app" cluster for the public listener
appCluster, err := s.makeAppCluster(cfgSnap, LocalAppClusterName, "", cfgSnap.Proxy.LocalServicePort)
if err != nil {
return nil, err
}
clusters = append(clusters, appCluster)
for _, u := range cfgSnap.Proxy.Upstreams {
id := u.Identifier()
// In TransparentProxy mode there needs to be a passthrough cluster for traffic going to destinations
// that aren't in Consul's catalog.
// TODO (freddy): Add cluster-wide setting that can disable this cluster and restrict traffic to catalog destinations.
if cfgSnap.Proxy.TransparentProxy {
clusters = append(clusters, &envoy_cluster_v3.Cluster{
Name: OriginalDestinationClusterName,
ClusterDiscoveryType: &envoy_cluster_v3.Cluster_Type{
Type: envoy_cluster_v3.Cluster_ORIGINAL_DST,
},
LbPolicy: envoy_cluster_v3.Cluster_CLUSTER_PROVIDED,
ConnectTimeout: ptypes.DurationProto(5 * time.Second),
})
}
if u.DestinationType == structs.UpstreamDestTypePreparedQuery {
upstreamCluster, err := s.makeUpstreamClusterForPreparedQuery(u, cfgSnap)
if err != nil {
return nil, err
}
clusters = append(clusters, upstreamCluster)
} else {
chain := cfgSnap.ConnectProxy.DiscoveryChain[id]
chainEndpoints, ok := cfgSnap.ConnectProxy.WatchedUpstreamEndpoints[id]
if !ok {
// this should not happen
return nil, fmt.Errorf("no endpoint map for upstream %q", id)
}
upstreamClusters, err := s.makeUpstreamClustersForDiscoveryChain(u, chain, chainEndpoints, cfgSnap)
if err != nil {
return nil, err
}
for _, cluster := range upstreamClusters {
clusters = append(clusters, cluster)
}
for id, chain := range cfgSnap.ConnectProxy.DiscoveryChain {
chainEndpoints, ok := cfgSnap.ConnectProxy.WatchedUpstreamEndpoints[id]
if !ok {
// this should not happen
return nil, fmt.Errorf("no endpoint map for upstream %q", id)
}
upstreamClusters, err := s.makeUpstreamClustersForDiscoveryChain(id, cfgSnap.ConnectProxy.UpstreamConfig[id], chain, chainEndpoints, cfgSnap)
if err != nil {
return nil, err
}
for _, cluster := range upstreamClusters {
clusters = append(clusters, cluster)
}
}
for _, u := range cfgSnap.Proxy.Upstreams {
if u.DestinationType != structs.UpstreamDestTypePreparedQuery {
continue
}
upstreamCluster, err := s.makeUpstreamClusterForPreparedQuery(u, cfgSnap)
if err != nil {
return nil, err
}
clusters = append(clusters, upstreamCluster)
}
cfgSnap.Proxy.Expose.Finalize()
@ -316,7 +329,7 @@ func (s *Server) clustersFromSnapshotIngressGateway(cfgSnap *proxycfg.ConfigSnap
return nil, fmt.Errorf("no endpoint map for upstream %q", id)
}
upstreamClusters, err := s.makeUpstreamClustersForDiscoveryChain(u, chain, chainEndpoints, cfgSnap)
upstreamClusters, err := s.makeUpstreamClustersForDiscoveryChain(id, &u, chain, chainEndpoints, cfgSnap)
if err != nil {
return nil, err
}
@ -439,16 +452,21 @@ func (s *Server) makeUpstreamClusterForPreparedQuery(upstream structs.Upstream,
}
func (s *Server) makeUpstreamClustersForDiscoveryChain(
upstream structs.Upstream,
id string,
upstream *structs.Upstream,
chain *structs.CompiledDiscoveryChain,
chainEndpoints map[string]structs.CheckServiceNodes,
cfgSnap *proxycfg.ConfigSnapshot,
) ([]*envoy_cluster_v3.Cluster, error) {
if chain == nil {
return nil, fmt.Errorf("cannot create upstream cluster without discovery chain for %s", upstream.Identifier())
return nil, fmt.Errorf("cannot create upstream cluster without discovery chain for %s", id)
}
cfg, err := structs.ParseUpstreamConfigNoDefaults(upstream.Config)
configMap := make(map[string]interface{})
if upstream != nil {
configMap = upstream.Config
}
cfg, err := structs.ParseUpstreamConfigNoDefaults(configMap)
if err != nil {
// Don't hard fail on a config typo, just warn. The parse func returns
// default config if there is an error so it's safe to continue.

View File

@ -67,6 +67,15 @@ func TestClustersFromSnapshot(t *testing.T) {
customAppClusterJSON(t, customClusterJSONOptions{
Name: "myservice",
})
snap.ConnectProxy.UpstreamConfig = map[string]*structs.Upstream{
"db": {
Config: map[string]interface{}{
"envoy_cluster_json": customAppClusterJSON(t, customClusterJSONOptions{
Name: "myservice",
}),
},
},
}
},
},
{
@ -616,6 +625,13 @@ func TestClustersFromSnapshot(t *testing.T) {
create: proxycfg.TestConfigSnapshotIngress_MultipleListenersDuplicateService,
setup: nil,
},
{
name: "transparent-proxy",
create: proxycfg.TestConfigSnapshot,
setup: func(snap *proxycfg.ConfigSnapshot) {
snap.Proxy.TransparentProxy = true
},
},
}
latestEnvoyVersion := proxysupport.EnvoyVersions[0]

View File

@ -47,45 +47,41 @@ func (s *Server) endpointsFromSnapshotConnectProxy(cfgSnap *proxycfg.ConfigSnaps
resources := make([]proto.Message, 0,
len(cfgSnap.ConnectProxy.PreparedQueryEndpoints)+len(cfgSnap.ConnectProxy.WatchedUpstreamEndpoints))
for id, chain := range cfgSnap.ConnectProxy.DiscoveryChain {
es := s.endpointsFromDiscoveryChain(
id,
chain,
cfgSnap.Datacenter,
cfgSnap.ConnectProxy.UpstreamConfig[id],
cfgSnap.ConnectProxy.WatchedUpstreamEndpoints[id],
cfgSnap.ConnectProxy.WatchedGatewayEndpoints[id],
)
resources = append(resources, es...)
}
// Looping over explicit upstreams is only needed for prepared queries because they do not have discovery chains
for _, u := range cfgSnap.Proxy.Upstreams {
if u.DestinationType != structs.UpstreamDestTypePreparedQuery {
continue
}
id := u.Identifier()
var chain *structs.CompiledDiscoveryChain
if u.DestinationType != structs.UpstreamDestTypePreparedQuery {
chain = cfgSnap.ConnectProxy.DiscoveryChain[id]
dc := u.Datacenter
if dc == "" {
dc = cfgSnap.Datacenter
}
clusterName := connect.UpstreamSNI(&u, "", dc, cfgSnap.Roots.TrustDomain)
if chain == nil {
// We ONLY want this branch for prepared queries.
dc := u.Datacenter
if dc == "" {
dc = cfgSnap.Datacenter
}
clusterName := connect.UpstreamSNI(&u, "", dc, cfgSnap.Roots.TrustDomain)
endpoints, ok := cfgSnap.ConnectProxy.PreparedQueryEndpoints[id]
if ok {
la := makeLoadAssignment(
clusterName,
[]loadAssignmentEndpointGroup{
{Endpoints: endpoints},
},
cfgSnap.Datacenter,
)
resources = append(resources, la)
}
} else {
// Newfangled discovery chain plumbing.
es := s.endpointsFromDiscoveryChain(
u,
chain,
endpoints, ok := cfgSnap.ConnectProxy.PreparedQueryEndpoints[id]
if ok {
la := makeLoadAssignment(
clusterName,
[]loadAssignmentEndpointGroup{
{Endpoints: endpoints},
},
cfgSnap.Datacenter,
cfgSnap.ConnectProxy.WatchedUpstreamEndpoints[id],
cfgSnap.ConnectProxy.WatchedGatewayEndpoints[id],
)
resources = append(resources, es...)
resources = append(resources, la)
}
}
@ -277,9 +273,10 @@ func (s *Server) endpointsFromSnapshotIngressGateway(cfgSnap *proxycfg.ConfigSna
}
es := s.endpointsFromDiscoveryChain(
u,
id,
cfgSnap.IngressGateway.DiscoveryChain[id],
cfgSnap.Datacenter,
&u,
cfgSnap.IngressGateway.WatchedUpstreamEndpoints[id],
cfgSnap.IngressGateway.WatchedGatewayEndpoints[id],
)
@ -301,9 +298,10 @@ func makeEndpoint(host string, port int) *envoy_endpoint_v3.LbEndpoint {
}
func (s *Server) endpointsFromDiscoveryChain(
upstream structs.Upstream,
id string,
chain *structs.CompiledDiscoveryChain,
datacenter string,
upstream *structs.Upstream,
upstreamEndpoints, gatewayEndpoints map[string]structs.CheckServiceNodes,
) []proto.Message {
var resources []proto.Message
@ -312,11 +310,15 @@ func (s *Server) endpointsFromDiscoveryChain(
return resources
}
cfg, err := structs.ParseUpstreamConfigNoDefaults(upstream.Config)
configMap := make(map[string]interface{})
if upstream != nil {
configMap = upstream.Config
}
cfg, err := structs.ParseUpstreamConfigNoDefaults(configMap)
if err != nil {
// Don't hard fail on a config typo, just warn. The parse func returns
// default config if there is an error so it's safe to continue.
s.Logger.Warn("failed to parse", "upstream", upstream.Identifier(),
s.Logger.Warn("failed to parse", "upstream", id,
"error", err)
}
@ -331,7 +333,7 @@ func (s *Server) endpointsFromDiscoveryChain(
}
} else {
s.Logger.Warn("ignoring escape hatch setting, because a discovery chain is configued for",
"discovery chain", chain.ServiceName, "upstream", upstream.Identifier(),
"discovery chain", chain.ServiceName, "upstream", id,
"envoy_cluster_json", chain.ServiceName)
}
}

View File

@ -317,6 +317,15 @@ func TestEndpointsFromSnapshot(t *testing.T) {
customAppClusterJSON(t, customClusterJSONOptions{
Name: "myservice",
})
snap.ConnectProxy.UpstreamConfig = map[string]*structs.Upstream{
"db": {
Config: map[string]interface{}{
"envoy_cluster_json": customAppClusterJSON(t, customClusterJSONOptions{
Name: "myservice",
}),
},
},
}
},
},
{

View File

@ -32,6 +32,11 @@ import (
"github.com/hashicorp/consul/logging"
)
const (
// TODO (freddy) Make this configurable
TProxyOutboundPort = 15001
)
// listenersFromSnapshot returns the xDS API representation of the "listeners" in the snapshot.
func (s *Server) listenersFromSnapshot(cInfo connectionInfo, cfgSnap *proxycfg.ConfigSnapshot) ([]proto.Message, error) {
if cfgSnap == nil {
@ -54,27 +59,83 @@ func (s *Server) listenersFromSnapshot(cInfo connectionInfo, cfgSnap *proxycfg.C
// listenersFromSnapshotConnectProxy returns the "listeners" for a connect proxy service
func (s *Server) listenersFromSnapshotConnectProxy(cInfo connectionInfo, cfgSnap *proxycfg.ConfigSnapshot) ([]proto.Message, error) {
// One listener for each upstream plus the public one
resources := make([]proto.Message, len(cfgSnap.Proxy.Upstreams)+1)
resources := make([]proto.Message, 1)
// Configure public listener
var err error
resources[0], err = s.makePublicListener(cInfo, cfgSnap)
// Configure inbound listener.
resources[0], err = s.makeInboundListener(cInfo, cfgSnap, PublicListenerName)
if err != nil {
return nil, err
}
for i, u := range cfgSnap.Proxy.Upstreams {
id := u.Identifier()
var chain *structs.CompiledDiscoveryChain
if u.DestinationType != structs.UpstreamDestTypePreparedQuery {
chain = cfgSnap.ConnectProxy.DiscoveryChain[id]
// This outboundListener is exclusively used when TransparentProxy mode is active.
// In that situation there is a single listener where we are redirecting outbound traffic,
// and each upstream gets a filter chain attached to that listener.
var outboundListener *envoy_listener_v3.Listener
if cfgSnap.Proxy.TransparentProxy {
outboundListener = makeListener(OutboundListenerName, "127.0.0.1", TProxyOutboundPort, envoy_core_v3.TrafficDirection_OUTBOUND)
outboundListener.FilterChains = make([]*envoy_listener_v3.FilterChain, 0)
outboundListener.ListenerFilters = []*envoy_listener_v3.ListenerFilter{
{
// TODO (freddy): Hard-coded until we upgrade the go-control-plane library
Name: "envoy.filters.listener.original_dst",
},
}
}
var hasFilterChains bool
for id, chain := range cfgSnap.ConnectProxy.DiscoveryChain {
upstreamCfg := cfgSnap.ConnectProxy.UpstreamConfig[id]
cfg := getAndModifyUpstreamConfigForListener(s.Logger, id, upstreamCfg, chain)
// If escape hatch is present, create a listener from it and move on to the next
if cfg.EnvoyListenerJSON != "" {
upstreamListener, err := makeListenerFromUserConfig(cfg.EnvoyListenerJSON)
if err != nil {
return nil, err
}
resources = append(resources, upstreamListener)
continue
}
var upstreamListener proto.Message
upstreamListener, err = s.makeUpstreamListenerForDiscoveryChain(
&u,
u.LocalBindAddress,
// Generate the upstream listeners for when they are explicitly set with a local bind port
if outboundListener == nil || (upstreamCfg != nil && upstreamCfg.LocalBindPort != 0) {
address := "127.0.0.1"
if upstreamCfg.LocalBindAddress != "" {
address = upstreamCfg.LocalBindAddress
}
filterChain, err := s.makeUpstreamFilterChainForDiscoveryChain(
id,
"",
cfg.Protocol,
upstreamCfg,
chain,
cfgSnap,
nil,
)
if err != nil {
return nil, err
}
upstreamListener := makeListener(id, address, upstreamCfg.LocalBindPort, envoy_core_v3.TrafficDirection_OUTBOUND)
upstreamListener.FilterChains = []*envoy_listener_v3.FilterChain{
filterChain,
}
resources = append(resources, upstreamListener)
// Avoid creating filter chains below for upstreams that have dedicated listeners
continue
}
filterChain, err := s.makeUpstreamFilterChainForDiscoveryChain(
id,
"",
cfg.Protocol,
upstreamCfg,
chain,
cfgSnap,
nil,
@ -82,7 +143,138 @@ func (s *Server) listenersFromSnapshotConnectProxy(cInfo connectionInfo, cfgSnap
if err != nil {
return nil, err
}
resources[i+1] = upstreamListener
// For filter chains used by the transparent proxy, we need to match on multiple destination addresses.
// These might be: the ClusterIP in k8s, or any of the service instance addresses.
endpoints := cfgSnap.ConnectProxy.WatchedUpstreamEndpoints[id]
uniqueAddrs := make(map[string]struct{})
for _, t := range chain.Targets {
var k8sNamespace string
// Store all the IP addresses per unique port
for _, e := range endpoints[t.ID] {
addr, _ := e.BestAddress(false)
if _, ok := uniqueAddrs[addr]; !ok {
uniqueAddrs[addr] = struct{}{}
}
// The k8s namespace should be the same for all instances, so pick any
if ns, ok := e.Service.Meta["k8s-namespace"]; ok {
k8sNamespace = ns
}
}
// TODO (freddy) hack to remove for beta: for every potential discovery chain target, resolve the k8s ClusterIP
// since it's not stored in Consul's catalog (yet)
if k8sNamespace != "" {
host := fmt.Sprintf("%s.%s.svc.cluster.local", t.Service, k8sNamespace)
resolved, err := net.LookupHost(host)
if err != nil {
// We still have the Pod ips in the catalog, so don't hard-fail on errors
s.Logger.Warn("failed to resolve", "host", host, "error", err)
continue
}
for _, addr := range resolved {
if _, ok := uniqueAddrs[addr]; !ok {
uniqueAddrs[addr] = struct{}{}
}
}
}
}
// For every potential address we collected, create the appropriate address prefix to match on.
// In this case we are matching on exact addresses, so the prefix is the address itself,
// and the prefix length is based on whether it's IPv4 or IPv6.
ranges := make([]*envoy_core_v3.CidrRange, 0)
for addr := range uniqueAddrs {
ip := net.ParseIP(addr)
if ip == nil {
continue
}
pfxLen := uint32(32)
if ip.To4() == nil {
pfxLen = 128
}
ranges = append(ranges, &envoy_core_v3.CidrRange{
AddressPrefix: addr,
PrefixLen: &wrappers.UInt32Value{Value: pfxLen},
})
}
filterChain.FilterChainMatch = &envoy_listener_v3.FilterChainMatch{
PrefixRanges: ranges,
}
// Only attach the filter chain if there are addresses to match on
if len(ranges) > 0 {
outboundListener.FilterChains = append(outboundListener.FilterChains, filterChain)
}
hasFilterChains = true
}
// Only create the outbound listener when there are upstreams and filter chains are present
if outboundListener != nil && hasFilterChains {
// Filter chains are stable sorted to avoid draining if the list is provided out of order
sort.SliceStable(outboundListener.FilterChains, func(i, j int) bool {
return outboundListener.FilterChains[i].Name < outboundListener.FilterChains[j].Name
})
// Add a catch-all filter chain that acts as a TCP proxy to non-catalog destinations
filterChain, err := s.makeUpstreamFilterChainForDiscoveryChain(
"passthrough",
OriginalDestinationClusterName,
"tcp",
nil,
nil,
cfgSnap,
nil,
)
if err != nil {
return nil, err
}
outboundListener.FilterChains = append(outboundListener.FilterChains, filterChain)
resources = append(resources, outboundListener)
}
// Looping over explicit upstreams is only needed for prepared queries because they do not have discovery chains
for id, u := range cfgSnap.ConnectProxy.UpstreamConfig {
if u.DestinationType != structs.UpstreamDestTypePreparedQuery {
continue
}
cfg, err := structs.ParseUpstreamConfig(u.Config)
if err != nil {
// Don't hard fail on a config typo, just warn. The parse func returns
// default config if there is an error so it's safe to continue.
s.Logger.Warn("failed to parse", "upstream", u.Identifier(), "error", err)
}
address := "127.0.0.1"
if u.LocalBindAddress != "" {
address = u.LocalBindAddress
}
upstreamListener := makeListener(id, address, u.LocalBindPort, envoy_core_v3.TrafficDirection_OUTBOUND)
filterChain, err := s.makeUpstreamFilterChainForDiscoveryChain(
id,
"",
cfg.Protocol,
u,
nil,
cfgSnap,
nil,
)
if err != nil {
return nil, err
}
upstreamListener.FilterChains = []*envoy_listener_v3.FilterChain{
filterChain,
}
resources = append(resources, upstreamListener)
}
cfgSnap.Proxy.Expose.Finalize()
@ -423,7 +615,7 @@ const (
)
// Locate the existing http connect manager L4 filter and inject our RBAC filter at the top.
func (s *Server) injectHTTPFilterOnFilterChains(
func injectHTTPFilterOnFilterChains(
listener *envoy_listener_v3.Listener,
authzFilter *envoy_http_v3.HttpFilter,
) error {
@ -503,7 +695,7 @@ func (s *Server) injectConnectTLSOnFilterChains(_ connectionInfo, cfgSnap *proxy
return nil
}
func (s *Server) makePublicListener(cInfo connectionInfo, cfgSnap *proxycfg.ConfigSnapshot) (proto.Message, error) {
func (s *Server) makeInboundListener(cInfo connectionInfo, cfgSnap *proxycfg.ConfigSnapshot, name string) (proto.Message, error) {
var l *envoy_listener_v3.Listener
var err error
@ -514,106 +706,120 @@ func (s *Server) makePublicListener(cInfo connectionInfo, cfgSnap *proxycfg.Conf
s.Logger.Warn("failed to parse Connect.Proxy.Config", "error", err)
}
if cfg.PublicListenerJSON != "" {
l, err = makeListenerFromUserConfig(cfg.PublicListenerJSON)
if err != nil {
return l, err
}
// In the happy path don't return yet as we need to inject TLS and authz config still.
}
// This controls if we do L4 or L7 intention checks.
useHTTPFilter := structs.IsProtocolHTTPLike(cfg.Protocol)
if l == nil {
// No user config, use default listener
addr := cfgSnap.Address
// Override with bind address if one is set, otherwise default
// to 0.0.0.0
if cfg.BindAddress != "" {
addr = cfg.BindAddress
} else if addr == "" {
addr = "0.0.0.0"
}
// Override with bind port if one is set, otherwise default to
// proxy service's address
port := cfgSnap.Port
if cfg.BindPort != 0 {
port = cfg.BindPort
}
l = makeListener(PublicListenerName, addr, port, envoy_core_v3.TrafficDirection_INBOUND)
opts := listenerFilterOpts{
useRDS: false,
protocol: cfg.Protocol,
filterName: "public_listener",
routeName: "public_listener",
cluster: LocalAppClusterName,
statPrefix: "",
routePath: "",
requestTimeoutMs: cfg.LocalRequestTimeoutMs,
// Generate and return custom public listener from config if one was provided.
if cfg.PublicListenerJSON != "" {
l, err = makeListenerFromUserConfig(cfg.PublicListenerJSON)
if err != nil {
return nil, err
}
// For HTTP-like services attach an RBAC http filter and do a best-effort insert
if useHTTPFilter {
opts.httpAuthzFilter, err = makeRBACHTTPFilter(
httpAuthzFilter, err := makeRBACHTTPFilter(
cfgSnap.ConnectProxy.Intentions,
cfgSnap.IntentionDefaultAllow,
)
if err != nil {
return nil, err
}
// Try our best to inject the HTTP RBAC filter.
if err := injectHTTPFilterOnFilterChains(l, httpAuthzFilter); err != nil {
s.Logger.Warn(
"could not inject the HTTP RBAC filter to enforce intentions on user-provided "+
"'envoy_public_listener_json' config; falling back on the RBAC network filter instead",
"proxy", cfgSnap.ProxyID,
"error", err,
)
// If we get an error inject the RBAC network filter instead.
useHTTPFilter = false
}
}
filter, err := makeListenerFilter(opts)
err := s.finalizePublicListenerFromConfig(l, cInfo, cfgSnap, useHTTPFilter)
if err != nil {
return nil, err
}
l.FilterChains = []*envoy_listener_v3.FilterChain{
{
Filters: []*envoy_listener_v3.Filter{
filter,
},
},
return nil, fmt.Errorf("failed to attach Consul filters and TLS context to custom public listener: %v", err)
}
return l, nil
}
} else if useHTTPFilter {
httpAuthzFilter, err := makeRBACHTTPFilter(
// No JSON user config, use default listener address
// Default to listening on all addresses, but override with bind address if one is set.
addr := cfgSnap.Address
if addr == "" {
addr = "0.0.0.0"
}
if cfg.BindAddress != "" {
addr = cfg.BindAddress
}
// Override with bind port if one is set, otherwise default to
// proxy service's address
port := cfgSnap.Port
if cfg.BindPort != 0 {
port = cfg.BindPort
}
l = makeListener(name, addr, port, envoy_core_v3.TrafficDirection_INBOUND)
filterOpts := listenerFilterOpts{
protocol: cfg.Protocol,
filterName: name,
routeName: name,
cluster: LocalAppClusterName,
requestTimeoutMs: cfg.LocalRequestTimeoutMs,
}
if useHTTPFilter {
filterOpts.httpAuthzFilter, err = makeRBACHTTPFilter(
cfgSnap.ConnectProxy.Intentions,
cfgSnap.IntentionDefaultAllow,
)
if err != nil {
return nil, err
}
// We're using the listener escape hatch, so try our best to inject the
// HTTP RBAC filter, but if we can't then just inject the RBAC Network
// filter instead.
if err := s.injectHTTPFilterOnFilterChains(l, httpAuthzFilter); err != nil {
s.Logger.Warn(
"could not inject the HTTP RBAC filter to enforce intentions on user-provided 'envoy_public_listener_json' config; falling back on the RBAC network filter instead",
"proxy", cfgSnap.ProxyID,
"error", err,
)
useHTTPFilter = false
}
}
if !useHTTPFilter {
if err := s.injectConnectFilters(cInfo, cfgSnap, l); err != nil {
return nil, err
}
}
if err := s.injectConnectTLSOnFilterChains(cInfo, cfgSnap, l); err != nil {
filter, err := makeListenerFilter(filterOpts)
if err != nil {
return nil, err
}
l.FilterChains = []*envoy_listener_v3.FilterChain{
{
Filters: []*envoy_listener_v3.Filter{
filter,
},
},
}
err = s.finalizePublicListenerFromConfig(l, cInfo, cfgSnap, useHTTPFilter)
if err != nil {
return nil, fmt.Errorf("failed to attach Consul filters and TLS context to custom public listener: %v", err)
}
return l, err
}
// finalizePublicListenerFromConfig is used for best-effort injection of Consul filter-chains onto listeners.
// This include L4 authorization filters and TLS context.
func (s *Server) finalizePublicListenerFromConfig(l *envoy_listener_v3.Listener,
cInfo connectionInfo, cfgSnap *proxycfg.ConfigSnapshot, useHTTPFilter bool) error {
if !useHTTPFilter {
// Best-effort injection of L4 intentions
if err := s.injectConnectFilters(cInfo, cfgSnap, l); err != nil {
return nil
}
}
// Always apply TLS certificates
if err := s.injectConnectTLSOnFilterChains(cInfo, cfgSnap, l); err != nil {
return nil
}
return nil
}
func (s *Server) makeExposedCheckListener(cfgSnap *proxycfg.ConfigSnapshot, cluster string, path structs.ExposePath) (proto.Message, error) {
cfg, err := ParseProxyConfig(cfgSnap.Proxy.Config)
if err != nil {
@ -973,6 +1179,109 @@ func (s *Server) makeMeshGatewayListener(name, addr string, port int, cfgSnap *p
return l, nil
}
func (s *Server) makeUpstreamFilterChainForDiscoveryChain(
id string,
overrideCluster string,
protocol string,
u *structs.Upstream,
chain *structs.CompiledDiscoveryChain,
cfgSnap *proxycfg.ConfigSnapshot,
tlsContext *envoy_tls_v3.DownstreamTlsContext,
) (*envoy_listener_v3.FilterChain, error) {
// TODO (freddy) Make this actually legible
useRDS := true
var (
clusterName string
destination, datacenter, namespace string
)
if chain != nil {
destination, datacenter, namespace = chain.ServiceName, chain.Datacenter, chain.Namespace
}
if (chain == nil || chain.IsDefault()) && u != nil {
useRDS = false
if datacenter == "" {
datacenter = u.Datacenter
}
if datacenter == "" {
datacenter = cfgSnap.Datacenter
}
if destination == "" {
destination = u.DestinationName
}
if namespace == "" {
namespace = u.DestinationNamespace
}
sni := connect.UpstreamSNI(u, "", datacenter, cfgSnap.Roots.TrustDomain)
clusterName = CustomizeClusterName(sni, chain)
} else {
if protocol == "tcp" && chain != nil {
useRDS = false
startNode := chain.Nodes[chain.StartNode]
if startNode == nil {
return nil, fmt.Errorf("missing first node in compiled discovery chain for: %s", chain.ServiceName)
}
if startNode.Type != structs.DiscoveryGraphNodeTypeResolver {
return nil, fmt.Errorf("unexpected first node in discovery chain using protocol=%q: %s", protocol, startNode.Type)
}
targetID := startNode.Resolver.Target
target := chain.Targets[targetID]
clusterName = CustomizeClusterName(target.Name, chain)
}
}
// Default the namespace to match how SNIs are generated
if namespace == "" {
namespace = structs.IntentionDefaultNamespace
}
filterName := fmt.Sprintf("%s.%s.%s", destination, namespace, datacenter)
if u != nil && u.DestinationType == structs.UpstreamDestTypePreparedQuery {
// Avoid encoding dc and namespace for prepared queries.
// Those are defined in the query itself and are not available here.
filterName = id
}
if overrideCluster != "" {
useRDS = false
clusterName = overrideCluster
filterName = overrideCluster
}
opts := listenerFilterOpts{
useRDS: useRDS,
protocol: protocol,
filterName: filterName,
routeName: id,
cluster: clusterName,
statPrefix: "upstream.",
routePath: "",
ingressGateway: false,
httpAuthzFilter: nil,
}
filter, err := makeListenerFilter(opts)
if err != nil {
return nil, err
}
transportSocket, err := makeDownstreamTLSTransportSocket(tlsContext)
if err != nil {
return nil, err
}
return &envoy_listener_v3.FilterChain{
Filters: []*envoy_listener_v3.Filter{
filter,
},
TransportSocket: transportSocket,
}, nil
}
// TODO(freddy) Replace in favor of new function above. Currently in use for ingress gateways.
func (s *Server) makeUpstreamListenerForDiscoveryChain(
u *structs.Upstream,
address string,
@ -986,7 +1295,7 @@ func (s *Server) makeUpstreamListenerForDiscoveryChain(
upstreamID := u.Identifier()
l := makeListener(upstreamID, address, u.LocalBindPort, envoy_core_v3.TrafficDirection_OUTBOUND)
cfg := getAndModifyUpstreamConfigForListener(s.Logger, u, chain)
cfg := getAndModifyUpstreamConfigForListener(s.Logger, upstreamID, u, chain)
if cfg.EnvoyListenerJSON != "" {
return makeListenerFromUserConfig(cfg.EnvoyListenerJSON)
}
@ -1071,48 +1380,51 @@ func (s *Server) makeUpstreamListenerForDiscoveryChain(
return l, nil
}
func getAndModifyUpstreamConfigForListener(logger hclog.Logger, u *structs.Upstream, chain *structs.CompiledDiscoveryChain) structs.UpstreamConfig {
func getAndModifyUpstreamConfigForListener(logger hclog.Logger, id string, u *structs.Upstream, chain *structs.CompiledDiscoveryChain) structs.UpstreamConfig {
var (
cfg structs.UpstreamConfig
err error
)
configMap := make(map[string]interface{})
if u != nil {
configMap = u.Config
}
if chain == nil || chain.IsDefault() {
cfg, err = structs.ParseUpstreamConfig(u.Config)
cfg, err = structs.ParseUpstreamConfig(configMap)
if err != nil {
// Don't hard fail on a config typo, just warn. The parse func returns
// default config if there is an error so it's safe to continue.
logger.Warn("failed to parse", "upstream", u.Identifier(), "error", err)
logger.Warn("failed to parse", "upstream", id, "error", err)
}
} else {
// Use NoDefaults here so that we can set the protocol to the chain
// protocol if necessary
cfg, err = structs.ParseUpstreamConfigNoDefaults(u.Config)
cfg, err = structs.ParseUpstreamConfigNoDefaults(configMap)
if err != nil {
// Don't hard fail on a config typo, just warn. The parse func returns
// default config if there is an error so it's safe to continue.
logger.Warn("failed to parse", "upstream", u.Identifier(), "error", err)
logger.Warn("failed to parse", "upstream", id, "error", err)
}
if cfg.EnvoyListenerJSON != "" {
logger.Warn("ignoring escape hatch setting because already configured for",
"discovery chain", chain.ServiceName, "upstream", u.Identifier(), "config", "envoy_listener_json")
"discovery chain", chain.ServiceName, "upstream", id, "config", "envoy_listener_json")
// Remove from config struct so we don't use it later on
cfg.EnvoyListenerJSON = ""
}
proto := cfg.Protocol
if proto == "" {
proto = chain.Protocol
protocol := cfg.Protocol
if protocol == "" {
protocol = chain.Protocol
}
if proto == "" {
proto = "tcp"
if protocol == "" {
protocol = "tcp"
}
// set back on the config so that we can use it from return value
cfg.Protocol = proto
cfg.Protocol = protocol
}
return cfg
@ -1127,6 +1439,7 @@ type listenerFilterOpts struct {
statPrefix string
routePath string
requestTimeoutMs *int
ingressGateway bool
httpAuthzFilter *envoy_http_v3.HttpFilter
}

View File

@ -2,22 +2,21 @@ package xds
import (
"bytes"
"path/filepath"
"sort"
"testing"
"text/template"
"time"
envoy_listener_v3 "github.com/envoyproxy/go-control-plane/envoy/config/listener/v3"
testinf "github.com/mitchellh/go-testing-interface"
"github.com/stretchr/testify/require"
"github.com/hashicorp/consul/agent/connect"
"github.com/hashicorp/consul/agent/consul/discoverychain"
"github.com/hashicorp/consul/agent/proxycfg"
"github.com/hashicorp/consul/agent/structs"
"github.com/hashicorp/consul/agent/xds/proxysupport"
"github.com/hashicorp/consul/sdk/testutil"
"github.com/hashicorp/consul/types"
testinf "github.com/mitchellh/go-testing-interface"
"github.com/stretchr/testify/require"
"path/filepath"
"sort"
"testing"
"text/template"
"time"
)
func TestListenersFromSnapshot(t *testing.T) {
@ -476,6 +475,37 @@ func TestListenersFromSnapshot(t *testing.T) {
create: proxycfg.TestConfigSnapshotIngressWithTLSListener,
setup: nil,
},
{
name: "transparent-proxy",
create: proxycfg.TestConfigSnapshot,
setup: func(snap *proxycfg.ConfigSnapshot) {
snap.Proxy.TransparentProxy = true
// DiscoveryChain without an UpstreamConfig should yield a filter chain when in TransparentProxy mode
snap.ConnectProxy.DiscoveryChain["google"] = discoverychain.TestCompileConfigEntries(
t, "google", "default", "dc1",
connect.TestClusterID+".consul", "dc1", nil)
snap.ConnectProxy.WatchedUpstreamEndpoints["google"] = map[string]structs.CheckServiceNodes{
"google.default.dc1": {
structs.CheckServiceNode{
Node: &structs.Node{
Address: "8.8.8.8",
Datacenter: "dc1",
},
Service: &structs.NodeService{
Service: "google",
Port: 9090,
},
},
},
}
// DiscoveryChains without endpoints do not get a filter chain because there are no addresses to match on.
snap.ConnectProxy.DiscoveryChain["no-endpoints"] = discoverychain.TestCompileConfigEntries(
t, "no-endpoints", "default", "dc1",
connect.TestClusterID+".consul", "dc1", nil)
},
},
}
latestEnvoyVersion := proxysupport.EnvoyVersions[0]

View File

@ -28,7 +28,7 @@ func (s *Server) routesFromSnapshot(cInfo connectionInfo, cfgSnap *proxycfg.Conf
switch cfgSnap.Kind {
case structs.ServiceKindConnectProxy:
return routesForConnectProxy(cInfo, cfgSnap.Proxy.Upstreams, cfgSnap.ConnectProxy.DiscoveryChain)
return routesForConnectProxy(cInfo, cfgSnap.ConnectProxy.DiscoveryChain)
case structs.ServiceKindIngressGateway:
return routesForIngressGateway(cInfo, cfgSnap.IngressGateway.Upstreams, cfgSnap.IngressGateway.DiscoveryChain)
case structs.ServiceKindTerminatingGateway:
@ -42,37 +42,29 @@ func (s *Server) routesFromSnapshot(cInfo connectionInfo, cfgSnap *proxycfg.Conf
// "routes" in the snapshot.
func routesForConnectProxy(
cInfo connectionInfo,
upstreams structs.Upstreams,
chains map[string]*structs.CompiledDiscoveryChain,
) ([]proto.Message, error) {
var resources []proto.Message
for _, u := range upstreams {
upstreamID := u.Identifier()
var chain *structs.CompiledDiscoveryChain
if u.DestinationType != structs.UpstreamDestTypePreparedQuery {
chain = chains[upstreamID]
for id, chain := range chains {
if chain.IsDefault() {
continue
}
if chain == nil || chain.IsDefault() {
// TODO(rb): make this do the old school stuff too
} else {
virtualHost, err := makeUpstreamRouteForDiscoveryChain(cInfo, upstreamID, chain, []string{"*"})
if err != nil {
return nil, err
}
route := &envoy_route_v3.RouteConfiguration{
Name: upstreamID,
VirtualHosts: []*envoy_route_v3.VirtualHost{virtualHost},
// ValidateClusters defaults to true when defined statically and false
// when done via RDS. Re-set the sane value of true to prevent
// null-routing traffic.
ValidateClusters: makeBoolValue(true),
}
resources = append(resources, route)
virtualHost, err := makeUpstreamRouteForDiscoveryChain(cInfo, id, chain, []string{"*"})
if err != nil {
return nil, err
}
route := &envoy_route_v3.RouteConfiguration{
Name: id,
VirtualHosts: []*envoy_route_v3.VirtualHost{virtualHost},
// ValidateClusters defaults to true when defined statically and false
// when done via RDS. Re-set the sane value of true to prevent
// null-routing traffic.
ValidateClusters: makeBoolValue(true),
}
resources = append(resources, route)
}
// TODO(rb): make sure we don't generate an empty result

View File

@ -52,6 +52,9 @@ const (
// PublicListenerName is the name we give the public listener in Envoy config.
PublicListenerName = "public_listener"
// OutboundListenerName is the name we give the outbound Envoy listener when TransparentProxy mode is enabled.
OutboundListenerName = "outbound_listener"
// LocalAppClusterName is the name we give the local application "cluster" in
// Envoy config. Note that all cluster names may collide with service names
// since we want cluster names and service names to match to enable nice
@ -80,6 +83,12 @@ const (
// services named "local_agent" in the future.
LocalAgentClusterName = "local_agent"
// OriginalDestinationClusterName is the name we give to the passthrough
// cluster which redirects transparently-proxied requests to their original
// destination. This cluster prevents Consul from blocking connections to
// destinations outside of the catalog when in TransparentProxy mode.
OriginalDestinationClusterName = "original-destination"
// DefaultAuthCheckFrequency is the default value for
// Server.AuthCheckFrequency to use when the zero value is provided.
DefaultAuthCheckFrequency = 5 * time.Minute

View File

@ -0,0 +1,139 @@
{
"versionInfo": "00000001",
"resources": [
{
"@type": "type.googleapis.com/envoy.config.cluster.v3.Cluster",
"name": "db.default.dc1.internal.11111111-2222-3333-4444-555555555555.consul",
"altStatName": "db.default.dc1.internal.11111111-2222-3333-4444-555555555555.consul",
"type": "EDS",
"edsClusterConfig": {
"edsConfig": {
"ads": {
},
"resourceApiVersion": "V3"
}
},
"connectTimeout": "5s",
"circuitBreakers": {
},
"outlierDetection": {
},
"commonLbConfig": {
"healthyPanicThreshold": {
}
},
"transportSocket": {
"name": "tls",
"typedConfig": {
"@type": "type.googleapis.com/envoy.extensions.transport_sockets.tls.v3.UpstreamTlsContext",
"commonTlsContext": {
"tlsParams": {
},
"tlsCertificates": [
{
"certificateChain": {
"inlineString": "-----BEGIN CERTIFICATE-----\nMIICjDCCAjKgAwIBAgIIC5llxGV1gB8wCgYIKoZIzj0EAwIwFDESMBAGA1UEAxMJ\nVGVzdCBDQSAyMB4XDTE5MDMyMjEzNTgyNloXDTI5MDMyMjEzNTgyNlowDjEMMAoG\nA1UEAxMDd2ViMFkwEwYHKoZIzj0CAQYIKoZIzj0DAQcDQgAEADPv1RHVNRfa2VKR\nAB16b6rZnEt7tuhaxCFpQXPj7M2omb0B9Favq5E0ivpNtv1QnFhxtPd7d5k4e+T7\nSkW1TaOCAXIwggFuMA4GA1UdDwEB/wQEAwIDuDAdBgNVHSUEFjAUBggrBgEFBQcD\nAgYIKwYBBQUHAwEwDAYDVR0TAQH/BAIwADBoBgNVHQ4EYQRfN2Q6MDc6ODc6M2E6\nNDA6MTk6NDc6YzM6NWE6YzA6YmE6NjI6ZGY6YWY6NGI6ZDQ6MDU6MjU6NzY6M2Q6\nNWE6OGQ6MTY6OGQ6Njc6NWU6MmU6YTA6MzQ6N2Q6ZGM6ZmYwagYDVR0jBGMwYYBf\nZDE6MTE6MTE6YWM6MmE6YmE6OTc6YjI6M2Y6YWM6N2I6YmQ6ZGE6YmU6YjE6OGE6\nZmM6OWE6YmE6YjU6YmM6ODM6ZTc6NWU6NDE6NmY6ZjI6NzM6OTU6NTg6MGM6ZGIw\nWQYDVR0RBFIwUIZOc3BpZmZlOi8vMTExMTExMTEtMjIyMi0zMzMzLTQ0NDQtNTU1\nNTU1NTU1NTU1LmNvbnN1bC9ucy9kZWZhdWx0L2RjL2RjMS9zdmMvd2ViMAoGCCqG\nSM49BAMCA0gAMEUCIGC3TTvvjj76KMrguVyFf4tjOqaSCRie3nmHMRNNRav7AiEA\npY0heYeK9A6iOLrzqxSerkXXQyj5e9bE4VgUnxgPU6g=\n-----END CERTIFICATE-----\n"
},
"privateKey": {
"inlineString": "-----BEGIN EC PRIVATE KEY-----\nMHcCAQEEIMoTkpRggp3fqZzFKh82yS4LjtJI+XY+qX/7DefHFrtdoAoGCCqGSM49\nAwEHoUQDQgAEADPv1RHVNRfa2VKRAB16b6rZnEt7tuhaxCFpQXPj7M2omb0B9Fav\nq5E0ivpNtv1QnFhxtPd7d5k4e+T7SkW1TQ==\n-----END EC PRIVATE KEY-----\n"
}
}
],
"validationContext": {
"trustedCa": {
"inlineString": "-----BEGIN CERTIFICATE-----\nMIICXDCCAgKgAwIBAgIICpZq70Z9LyUwCgYIKoZIzj0EAwIwFDESMBAGA1UEAxMJ\nVGVzdCBDQSAyMB4XDTE5MDMyMjEzNTgyNloXDTI5MDMyMjEzNTgyNlowFDESMBAG\nA1UEAxMJVGVzdCBDQSAyMFkwEwYHKoZIzj0CAQYIKoZIzj0DAQcDQgAEIhywH1gx\nAsMwuF3ukAI5YL2jFxH6Usnma1HFSfVyxbXX1/uoZEYrj8yCAtdU2yoHETyd+Zx2\nThhRLP79pYegCaOCATwwggE4MA4GA1UdDwEB/wQEAwIBhjAPBgNVHRMBAf8EBTAD\nAQH/MGgGA1UdDgRhBF9kMToxMToxMTphYzoyYTpiYTo5NzpiMjozZjphYzo3Yjpi\nZDpkYTpiZTpiMTo4YTpmYzo5YTpiYTpiNTpiYzo4MzplNzo1ZTo0MTo2ZjpmMjo3\nMzo5NTo1ODowYzpkYjBqBgNVHSMEYzBhgF9kMToxMToxMTphYzoyYTpiYTo5Nzpi\nMjozZjphYzo3YjpiZDpkYTpiZTpiMTo4YTpmYzo5YTpiYTpiNTpiYzo4MzplNzo1\nZTo0MTo2ZjpmMjo3Mzo5NTo1ODowYzpkYjA/BgNVHREEODA2hjRzcGlmZmU6Ly8x\nMTExMTExMS0yMjIyLTMzMzMtNDQ0NC01NTU1NTU1NTU1NTUuY29uc3VsMAoGCCqG\nSM49BAMCA0gAMEUCICOY0i246rQHJt8o8Oya0D5PLL1FnmsQmQqIGCi31RwnAiEA\noR5f6Ku+cig2Il8T8LJujOp2/2A72QcHZA57B13y+8o=\n-----END CERTIFICATE-----\n"
}
}
},
"sni": "db.default.dc1.internal.11111111-2222-3333-4444-555555555555.consul"
}
}
},
{
"@type": "type.googleapis.com/envoy.config.cluster.v3.Cluster",
"name": "geo-cache.default.dc1.query.11111111-2222-3333-4444-555555555555.consul",
"type": "EDS",
"edsClusterConfig": {
"edsConfig": {
"ads": {
},
"resourceApiVersion": "V3"
}
},
"connectTimeout": "5s",
"circuitBreakers": {
},
"outlierDetection": {
},
"transportSocket": {
"name": "tls",
"typedConfig": {
"@type": "type.googleapis.com/envoy.extensions.transport_sockets.tls.v3.UpstreamTlsContext",
"commonTlsContext": {
"tlsParams": {
},
"tlsCertificates": [
{
"certificateChain": {
"inlineString": "-----BEGIN CERTIFICATE-----\nMIICjDCCAjKgAwIBAgIIC5llxGV1gB8wCgYIKoZIzj0EAwIwFDESMBAGA1UEAxMJ\nVGVzdCBDQSAyMB4XDTE5MDMyMjEzNTgyNloXDTI5MDMyMjEzNTgyNlowDjEMMAoG\nA1UEAxMDd2ViMFkwEwYHKoZIzj0CAQYIKoZIzj0DAQcDQgAEADPv1RHVNRfa2VKR\nAB16b6rZnEt7tuhaxCFpQXPj7M2omb0B9Favq5E0ivpNtv1QnFhxtPd7d5k4e+T7\nSkW1TaOCAXIwggFuMA4GA1UdDwEB/wQEAwIDuDAdBgNVHSUEFjAUBggrBgEFBQcD\nAgYIKwYBBQUHAwEwDAYDVR0TAQH/BAIwADBoBgNVHQ4EYQRfN2Q6MDc6ODc6M2E6\nNDA6MTk6NDc6YzM6NWE6YzA6YmE6NjI6ZGY6YWY6NGI6ZDQ6MDU6MjU6NzY6M2Q6\nNWE6OGQ6MTY6OGQ6Njc6NWU6MmU6YTA6MzQ6N2Q6ZGM6ZmYwagYDVR0jBGMwYYBf\nZDE6MTE6MTE6YWM6MmE6YmE6OTc6YjI6M2Y6YWM6N2I6YmQ6ZGE6YmU6YjE6OGE6\nZmM6OWE6YmE6YjU6YmM6ODM6ZTc6NWU6NDE6NmY6ZjI6NzM6OTU6NTg6MGM6ZGIw\nWQYDVR0RBFIwUIZOc3BpZmZlOi8vMTExMTExMTEtMjIyMi0zMzMzLTQ0NDQtNTU1\nNTU1NTU1NTU1LmNvbnN1bC9ucy9kZWZhdWx0L2RjL2RjMS9zdmMvd2ViMAoGCCqG\nSM49BAMCA0gAMEUCIGC3TTvvjj76KMrguVyFf4tjOqaSCRie3nmHMRNNRav7AiEA\npY0heYeK9A6iOLrzqxSerkXXQyj5e9bE4VgUnxgPU6g=\n-----END CERTIFICATE-----\n"
},
"privateKey": {
"inlineString": "-----BEGIN EC PRIVATE KEY-----\nMHcCAQEEIMoTkpRggp3fqZzFKh82yS4LjtJI+XY+qX/7DefHFrtdoAoGCCqGSM49\nAwEHoUQDQgAEADPv1RHVNRfa2VKRAB16b6rZnEt7tuhaxCFpQXPj7M2omb0B9Fav\nq5E0ivpNtv1QnFhxtPd7d5k4e+T7SkW1TQ==\n-----END EC PRIVATE KEY-----\n"
}
}
],
"validationContext": {
"trustedCa": {
"inlineString": "-----BEGIN CERTIFICATE-----\nMIICXDCCAgKgAwIBAgIICpZq70Z9LyUwCgYIKoZIzj0EAwIwFDESMBAGA1UEAxMJ\nVGVzdCBDQSAyMB4XDTE5MDMyMjEzNTgyNloXDTI5MDMyMjEzNTgyNlowFDESMBAG\nA1UEAxMJVGVzdCBDQSAyMFkwEwYHKoZIzj0CAQYIKoZIzj0DAQcDQgAEIhywH1gx\nAsMwuF3ukAI5YL2jFxH6Usnma1HFSfVyxbXX1/uoZEYrj8yCAtdU2yoHETyd+Zx2\nThhRLP79pYegCaOCATwwggE4MA4GA1UdDwEB/wQEAwIBhjAPBgNVHRMBAf8EBTAD\nAQH/MGgGA1UdDgRhBF9kMToxMToxMTphYzoyYTpiYTo5NzpiMjozZjphYzo3Yjpi\nZDpkYTpiZTpiMTo4YTpmYzo5YTpiYTpiNTpiYzo4MzplNzo1ZTo0MTo2ZjpmMjo3\nMzo5NTo1ODowYzpkYjBqBgNVHSMEYzBhgF9kMToxMToxMTphYzoyYTpiYTo5Nzpi\nMjozZjphYzo3YjpiZDpkYTpiZTpiMTo4YTpmYzo5YTpiYTpiNTpiYzo4MzplNzo1\nZTo0MTo2ZjpmMjo3Mzo5NTo1ODowYzpkYjA/BgNVHREEODA2hjRzcGlmZmU6Ly8x\nMTExMTExMS0yMjIyLTMzMzMtNDQ0NC01NTU1NTU1NTU1NTUuY29uc3VsMAoGCCqG\nSM49BAMCA0gAMEUCICOY0i246rQHJt8o8Oya0D5PLL1FnmsQmQqIGCi31RwnAiEA\noR5f6Ku+cig2Il8T8LJujOp2/2A72QcHZA57B13y+8o=\n-----END CERTIFICATE-----\n"
}
}
},
"sni": "geo-cache.default.dc1.query.11111111-2222-3333-4444-555555555555.consul"
}
}
},
{
"@type": "type.googleapis.com/envoy.config.cluster.v3.Cluster",
"name": "local_app",
"type": "STATIC",
"connectTimeout": "5s",
"loadAssignment": {
"clusterName": "local_app",
"endpoints": [
{
"lbEndpoints": [
{
"endpoint": {
"address": {
"socketAddress": {
"address": "127.0.0.1",
"portValue": 8080
}
}
}
}
]
}
]
}
},
{
"@type": "type.googleapis.com/envoy.config.cluster.v3.Cluster",
"name": "original-destination",
"type": "ORIGINAL_DST",
"connectTimeout": "5s",
"lbPolicy": "CLUSTER_PROVIDED"
}
],
"typeUrl": "type.googleapis.com/envoy.config.cluster.v3.Cluster",
"nonce": "00000001"
}

View File

@ -0,0 +1,139 @@
{
"versionInfo": "00000001",
"resources": [
{
"@type": "type.googleapis.com/envoy.api.v2.Cluster",
"name": "db.default.dc1.internal.11111111-2222-3333-4444-555555555555.consul",
"altStatName": "db.default.dc1.internal.11111111-2222-3333-4444-555555555555.consul",
"type": "EDS",
"edsClusterConfig": {
"edsConfig": {
"ads": {
},
"resourceApiVersion": "V2"
}
},
"connectTimeout": "5s",
"circuitBreakers": {
},
"outlierDetection": {
},
"commonLbConfig": {
"healthyPanicThreshold": {
}
},
"transportSocket": {
"name": "tls",
"typedConfig": {
"@type": "type.googleapis.com/envoy.api.v2.auth.UpstreamTlsContext",
"commonTlsContext": {
"tlsParams": {
},
"tlsCertificates": [
{
"certificateChain": {
"inlineString": "-----BEGIN CERTIFICATE-----\nMIICjDCCAjKgAwIBAgIIC5llxGV1gB8wCgYIKoZIzj0EAwIwFDESMBAGA1UEAxMJ\nVGVzdCBDQSAyMB4XDTE5MDMyMjEzNTgyNloXDTI5MDMyMjEzNTgyNlowDjEMMAoG\nA1UEAxMDd2ViMFkwEwYHKoZIzj0CAQYIKoZIzj0DAQcDQgAEADPv1RHVNRfa2VKR\nAB16b6rZnEt7tuhaxCFpQXPj7M2omb0B9Favq5E0ivpNtv1QnFhxtPd7d5k4e+T7\nSkW1TaOCAXIwggFuMA4GA1UdDwEB/wQEAwIDuDAdBgNVHSUEFjAUBggrBgEFBQcD\nAgYIKwYBBQUHAwEwDAYDVR0TAQH/BAIwADBoBgNVHQ4EYQRfN2Q6MDc6ODc6M2E6\nNDA6MTk6NDc6YzM6NWE6YzA6YmE6NjI6ZGY6YWY6NGI6ZDQ6MDU6MjU6NzY6M2Q6\nNWE6OGQ6MTY6OGQ6Njc6NWU6MmU6YTA6MzQ6N2Q6ZGM6ZmYwagYDVR0jBGMwYYBf\nZDE6MTE6MTE6YWM6MmE6YmE6OTc6YjI6M2Y6YWM6N2I6YmQ6ZGE6YmU6YjE6OGE6\nZmM6OWE6YmE6YjU6YmM6ODM6ZTc6NWU6NDE6NmY6ZjI6NzM6OTU6NTg6MGM6ZGIw\nWQYDVR0RBFIwUIZOc3BpZmZlOi8vMTExMTExMTEtMjIyMi0zMzMzLTQ0NDQtNTU1\nNTU1NTU1NTU1LmNvbnN1bC9ucy9kZWZhdWx0L2RjL2RjMS9zdmMvd2ViMAoGCCqG\nSM49BAMCA0gAMEUCIGC3TTvvjj76KMrguVyFf4tjOqaSCRie3nmHMRNNRav7AiEA\npY0heYeK9A6iOLrzqxSerkXXQyj5e9bE4VgUnxgPU6g=\n-----END CERTIFICATE-----\n"
},
"privateKey": {
"inlineString": "-----BEGIN EC PRIVATE KEY-----\nMHcCAQEEIMoTkpRggp3fqZzFKh82yS4LjtJI+XY+qX/7DefHFrtdoAoGCCqGSM49\nAwEHoUQDQgAEADPv1RHVNRfa2VKRAB16b6rZnEt7tuhaxCFpQXPj7M2omb0B9Fav\nq5E0ivpNtv1QnFhxtPd7d5k4e+T7SkW1TQ==\n-----END EC PRIVATE KEY-----\n"
}
}
],
"validationContext": {
"trustedCa": {
"inlineString": "-----BEGIN CERTIFICATE-----\nMIICXDCCAgKgAwIBAgIICpZq70Z9LyUwCgYIKoZIzj0EAwIwFDESMBAGA1UEAxMJ\nVGVzdCBDQSAyMB4XDTE5MDMyMjEzNTgyNloXDTI5MDMyMjEzNTgyNlowFDESMBAG\nA1UEAxMJVGVzdCBDQSAyMFkwEwYHKoZIzj0CAQYIKoZIzj0DAQcDQgAEIhywH1gx\nAsMwuF3ukAI5YL2jFxH6Usnma1HFSfVyxbXX1/uoZEYrj8yCAtdU2yoHETyd+Zx2\nThhRLP79pYegCaOCATwwggE4MA4GA1UdDwEB/wQEAwIBhjAPBgNVHRMBAf8EBTAD\nAQH/MGgGA1UdDgRhBF9kMToxMToxMTphYzoyYTpiYTo5NzpiMjozZjphYzo3Yjpi\nZDpkYTpiZTpiMTo4YTpmYzo5YTpiYTpiNTpiYzo4MzplNzo1ZTo0MTo2ZjpmMjo3\nMzo5NTo1ODowYzpkYjBqBgNVHSMEYzBhgF9kMToxMToxMTphYzoyYTpiYTo5Nzpi\nMjozZjphYzo3YjpiZDpkYTpiZTpiMTo4YTpmYzo5YTpiYTpiNTpiYzo4MzplNzo1\nZTo0MTo2ZjpmMjo3Mzo5NTo1ODowYzpkYjA/BgNVHREEODA2hjRzcGlmZmU6Ly8x\nMTExMTExMS0yMjIyLTMzMzMtNDQ0NC01NTU1NTU1NTU1NTUuY29uc3VsMAoGCCqG\nSM49BAMCA0gAMEUCICOY0i246rQHJt8o8Oya0D5PLL1FnmsQmQqIGCi31RwnAiEA\noR5f6Ku+cig2Il8T8LJujOp2/2A72QcHZA57B13y+8o=\n-----END CERTIFICATE-----\n"
}
}
},
"sni": "db.default.dc1.internal.11111111-2222-3333-4444-555555555555.consul"
}
}
},
{
"@type": "type.googleapis.com/envoy.api.v2.Cluster",
"name": "geo-cache.default.dc1.query.11111111-2222-3333-4444-555555555555.consul",
"type": "EDS",
"edsClusterConfig": {
"edsConfig": {
"ads": {
},
"resourceApiVersion": "V2"
}
},
"connectTimeout": "5s",
"circuitBreakers": {
},
"outlierDetection": {
},
"transportSocket": {
"name": "tls",
"typedConfig": {
"@type": "type.googleapis.com/envoy.api.v2.auth.UpstreamTlsContext",
"commonTlsContext": {
"tlsParams": {
},
"tlsCertificates": [
{
"certificateChain": {
"inlineString": "-----BEGIN CERTIFICATE-----\nMIICjDCCAjKgAwIBAgIIC5llxGV1gB8wCgYIKoZIzj0EAwIwFDESMBAGA1UEAxMJ\nVGVzdCBDQSAyMB4XDTE5MDMyMjEzNTgyNloXDTI5MDMyMjEzNTgyNlowDjEMMAoG\nA1UEAxMDd2ViMFkwEwYHKoZIzj0CAQYIKoZIzj0DAQcDQgAEADPv1RHVNRfa2VKR\nAB16b6rZnEt7tuhaxCFpQXPj7M2omb0B9Favq5E0ivpNtv1QnFhxtPd7d5k4e+T7\nSkW1TaOCAXIwggFuMA4GA1UdDwEB/wQEAwIDuDAdBgNVHSUEFjAUBggrBgEFBQcD\nAgYIKwYBBQUHAwEwDAYDVR0TAQH/BAIwADBoBgNVHQ4EYQRfN2Q6MDc6ODc6M2E6\nNDA6MTk6NDc6YzM6NWE6YzA6YmE6NjI6ZGY6YWY6NGI6ZDQ6MDU6MjU6NzY6M2Q6\nNWE6OGQ6MTY6OGQ6Njc6NWU6MmU6YTA6MzQ6N2Q6ZGM6ZmYwagYDVR0jBGMwYYBf\nZDE6MTE6MTE6YWM6MmE6YmE6OTc6YjI6M2Y6YWM6N2I6YmQ6ZGE6YmU6YjE6OGE6\nZmM6OWE6YmE6YjU6YmM6ODM6ZTc6NWU6NDE6NmY6ZjI6NzM6OTU6NTg6MGM6ZGIw\nWQYDVR0RBFIwUIZOc3BpZmZlOi8vMTExMTExMTEtMjIyMi0zMzMzLTQ0NDQtNTU1\nNTU1NTU1NTU1LmNvbnN1bC9ucy9kZWZhdWx0L2RjL2RjMS9zdmMvd2ViMAoGCCqG\nSM49BAMCA0gAMEUCIGC3TTvvjj76KMrguVyFf4tjOqaSCRie3nmHMRNNRav7AiEA\npY0heYeK9A6iOLrzqxSerkXXQyj5e9bE4VgUnxgPU6g=\n-----END CERTIFICATE-----\n"
},
"privateKey": {
"inlineString": "-----BEGIN EC PRIVATE KEY-----\nMHcCAQEEIMoTkpRggp3fqZzFKh82yS4LjtJI+XY+qX/7DefHFrtdoAoGCCqGSM49\nAwEHoUQDQgAEADPv1RHVNRfa2VKRAB16b6rZnEt7tuhaxCFpQXPj7M2omb0B9Fav\nq5E0ivpNtv1QnFhxtPd7d5k4e+T7SkW1TQ==\n-----END EC PRIVATE KEY-----\n"
}
}
],
"validationContext": {
"trustedCa": {
"inlineString": "-----BEGIN CERTIFICATE-----\nMIICXDCCAgKgAwIBAgIICpZq70Z9LyUwCgYIKoZIzj0EAwIwFDESMBAGA1UEAxMJ\nVGVzdCBDQSAyMB4XDTE5MDMyMjEzNTgyNloXDTI5MDMyMjEzNTgyNlowFDESMBAG\nA1UEAxMJVGVzdCBDQSAyMFkwEwYHKoZIzj0CAQYIKoZIzj0DAQcDQgAEIhywH1gx\nAsMwuF3ukAI5YL2jFxH6Usnma1HFSfVyxbXX1/uoZEYrj8yCAtdU2yoHETyd+Zx2\nThhRLP79pYegCaOCATwwggE4MA4GA1UdDwEB/wQEAwIBhjAPBgNVHRMBAf8EBTAD\nAQH/MGgGA1UdDgRhBF9kMToxMToxMTphYzoyYTpiYTo5NzpiMjozZjphYzo3Yjpi\nZDpkYTpiZTpiMTo4YTpmYzo5YTpiYTpiNTpiYzo4MzplNzo1ZTo0MTo2ZjpmMjo3\nMzo5NTo1ODowYzpkYjBqBgNVHSMEYzBhgF9kMToxMToxMTphYzoyYTpiYTo5Nzpi\nMjozZjphYzo3YjpiZDpkYTpiZTpiMTo4YTpmYzo5YTpiYTpiNTpiYzo4MzplNzo1\nZTo0MTo2ZjpmMjo3Mzo5NTo1ODowYzpkYjA/BgNVHREEODA2hjRzcGlmZmU6Ly8x\nMTExMTExMS0yMjIyLTMzMzMtNDQ0NC01NTU1NTU1NTU1NTUuY29uc3VsMAoGCCqG\nSM49BAMCA0gAMEUCICOY0i246rQHJt8o8Oya0D5PLL1FnmsQmQqIGCi31RwnAiEA\noR5f6Ku+cig2Il8T8LJujOp2/2A72QcHZA57B13y+8o=\n-----END CERTIFICATE-----\n"
}
}
},
"sni": "geo-cache.default.dc1.query.11111111-2222-3333-4444-555555555555.consul"
}
}
},
{
"@type": "type.googleapis.com/envoy.api.v2.Cluster",
"name": "local_app",
"type": "STATIC",
"connectTimeout": "5s",
"loadAssignment": {
"clusterName": "local_app",
"endpoints": [
{
"lbEndpoints": [
{
"endpoint": {
"address": {
"socketAddress": {
"address": "127.0.0.1",
"portValue": 8080
}
}
}
}
]
}
]
}
},
{
"@type": "type.googleapis.com/envoy.api.v2.Cluster",
"name": "original-destination",
"type": "ORIGINAL_DST",
"connectTimeout": "5s",
"lbPolicy": "CLUSTER_PROVIDED"
}
],
"typeUrl": "type.googleapis.com/envoy.api.v2.Cluster",
"nonce": "00000001"
}

View File

@ -0,0 +1,169 @@
{
"versionInfo": "00000001",
"resources": [
{
"@type": "type.googleapis.com/envoy.config.listener.v3.Listener",
"name": "db:127.0.0.1:9191",
"address": {
"socketAddress": {
"address": "127.0.0.1",
"portValue": 9191
}
},
"filterChains": [
{
"filters": [
{
"name": "envoy.filters.network.tcp_proxy",
"typedConfig": {
"@type": "type.googleapis.com/envoy.extensions.filters.network.tcp_proxy.v3.TcpProxy",
"statPrefix": "upstream.db.default.dc1",
"cluster": "db.default.dc1.internal.11111111-2222-3333-4444-555555555555.consul"
}
}
]
}
],
"trafficDirection": "OUTBOUND"
},
{
"@type": "type.googleapis.com/envoy.config.listener.v3.Listener",
"name": "outbound_listener:127.0.0.1:15001",
"address": {
"socketAddress": {
"address": "127.0.0.1",
"portValue": 15001
}
},
"filterChains": [
{
"filterChainMatch": {
"prefixRanges": [
{
"addressPrefix": "8.8.8.8",
"prefixLen": 32
}
]
},
"filters": [
{
"name": "envoy.filters.network.tcp_proxy",
"typedConfig": {
"@type": "type.googleapis.com/envoy.extensions.filters.network.tcp_proxy.v3.TcpProxy",
"statPrefix": "upstream.google.default.dc1",
"cluster": "google.default.dc1.internal.11111111-2222-3333-4444-555555555555.consul"
}
}
]
},
{
"filters": [
{
"name": "envoy.filters.network.tcp_proxy",
"typedConfig": {
"@type": "type.googleapis.com/envoy.extensions.filters.network.tcp_proxy.v3.TcpProxy",
"statPrefix": "upstream.original-destination",
"cluster": "original-destination"
}
}
]
}
],
"listenerFilters": [
{
"name": "envoy.filters.listener.original_dst"
}
],
"trafficDirection": "OUTBOUND"
},
{
"@type": "type.googleapis.com/envoy.config.listener.v3.Listener",
"name": "prepared_query:geo-cache:127.10.10.10:8181",
"address": {
"socketAddress": {
"address": "127.10.10.10",
"portValue": 8181
}
},
"filterChains": [
{
"filters": [
{
"name": "envoy.filters.network.tcp_proxy",
"typedConfig": {
"@type": "type.googleapis.com/envoy.extensions.filters.network.tcp_proxy.v3.TcpProxy",
"statPrefix": "upstream.prepared_query_geo-cache",
"cluster": "geo-cache.default.dc1.query.11111111-2222-3333-4444-555555555555.consul"
}
}
]
}
],
"trafficDirection": "OUTBOUND"
},
{
"@type": "type.googleapis.com/envoy.config.listener.v3.Listener",
"name": "public_listener:0.0.0.0:9999",
"address": {
"socketAddress": {
"address": "0.0.0.0",
"portValue": 9999
}
},
"filterChains": [
{
"filters": [
{
"name": "envoy.filters.network.rbac",
"typedConfig": {
"@type": "type.googleapis.com/envoy.extensions.filters.network.rbac.v3.RBAC",
"rules": {
},
"statPrefix": "connect_authz"
}
},
{
"name": "envoy.filters.network.tcp_proxy",
"typedConfig": {
"@type": "type.googleapis.com/envoy.extensions.filters.network.tcp_proxy.v3.TcpProxy",
"statPrefix": "public_listener",
"cluster": "local_app"
}
}
],
"transportSocket": {
"name": "tls",
"typedConfig": {
"@type": "type.googleapis.com/envoy.extensions.transport_sockets.tls.v3.DownstreamTlsContext",
"commonTlsContext": {
"tlsParams": {
},
"tlsCertificates": [
{
"certificateChain": {
"inlineString": "-----BEGIN CERTIFICATE-----\nMIICjDCCAjKgAwIBAgIIC5llxGV1gB8wCgYIKoZIzj0EAwIwFDESMBAGA1UEAxMJ\nVGVzdCBDQSAyMB4XDTE5MDMyMjEzNTgyNloXDTI5MDMyMjEzNTgyNlowDjEMMAoG\nA1UEAxMDd2ViMFkwEwYHKoZIzj0CAQYIKoZIzj0DAQcDQgAEADPv1RHVNRfa2VKR\nAB16b6rZnEt7tuhaxCFpQXPj7M2omb0B9Favq5E0ivpNtv1QnFhxtPd7d5k4e+T7\nSkW1TaOCAXIwggFuMA4GA1UdDwEB/wQEAwIDuDAdBgNVHSUEFjAUBggrBgEFBQcD\nAgYIKwYBBQUHAwEwDAYDVR0TAQH/BAIwADBoBgNVHQ4EYQRfN2Q6MDc6ODc6M2E6\nNDA6MTk6NDc6YzM6NWE6YzA6YmE6NjI6ZGY6YWY6NGI6ZDQ6MDU6MjU6NzY6M2Q6\nNWE6OGQ6MTY6OGQ6Njc6NWU6MmU6YTA6MzQ6N2Q6ZGM6ZmYwagYDVR0jBGMwYYBf\nZDE6MTE6MTE6YWM6MmE6YmE6OTc6YjI6M2Y6YWM6N2I6YmQ6ZGE6YmU6YjE6OGE6\nZmM6OWE6YmE6YjU6YmM6ODM6ZTc6NWU6NDE6NmY6ZjI6NzM6OTU6NTg6MGM6ZGIw\nWQYDVR0RBFIwUIZOc3BpZmZlOi8vMTExMTExMTEtMjIyMi0zMzMzLTQ0NDQtNTU1\nNTU1NTU1NTU1LmNvbnN1bC9ucy9kZWZhdWx0L2RjL2RjMS9zdmMvd2ViMAoGCCqG\nSM49BAMCA0gAMEUCIGC3TTvvjj76KMrguVyFf4tjOqaSCRie3nmHMRNNRav7AiEA\npY0heYeK9A6iOLrzqxSerkXXQyj5e9bE4VgUnxgPU6g=\n-----END CERTIFICATE-----\n"
},
"privateKey": {
"inlineString": "-----BEGIN EC PRIVATE KEY-----\nMHcCAQEEIMoTkpRggp3fqZzFKh82yS4LjtJI+XY+qX/7DefHFrtdoAoGCCqGSM49\nAwEHoUQDQgAEADPv1RHVNRfa2VKRAB16b6rZnEt7tuhaxCFpQXPj7M2omb0B9Fav\nq5E0ivpNtv1QnFhxtPd7d5k4e+T7SkW1TQ==\n-----END EC PRIVATE KEY-----\n"
}
}
],
"validationContext": {
"trustedCa": {
"inlineString": "-----BEGIN CERTIFICATE-----\nMIICXDCCAgKgAwIBAgIICpZq70Z9LyUwCgYIKoZIzj0EAwIwFDESMBAGA1UEAxMJ\nVGVzdCBDQSAyMB4XDTE5MDMyMjEzNTgyNloXDTI5MDMyMjEzNTgyNlowFDESMBAG\nA1UEAxMJVGVzdCBDQSAyMFkwEwYHKoZIzj0CAQYIKoZIzj0DAQcDQgAEIhywH1gx\nAsMwuF3ukAI5YL2jFxH6Usnma1HFSfVyxbXX1/uoZEYrj8yCAtdU2yoHETyd+Zx2\nThhRLP79pYegCaOCATwwggE4MA4GA1UdDwEB/wQEAwIBhjAPBgNVHRMBAf8EBTAD\nAQH/MGgGA1UdDgRhBF9kMToxMToxMTphYzoyYTpiYTo5NzpiMjozZjphYzo3Yjpi\nZDpkYTpiZTpiMTo4YTpmYzo5YTpiYTpiNTpiYzo4MzplNzo1ZTo0MTo2ZjpmMjo3\nMzo5NTo1ODowYzpkYjBqBgNVHSMEYzBhgF9kMToxMToxMTphYzoyYTpiYTo5Nzpi\nMjozZjphYzo3YjpiZDpkYTpiZTpiMTo4YTpmYzo5YTpiYTpiNTpiYzo4MzplNzo1\nZTo0MTo2ZjpmMjo3Mzo5NTo1ODowYzpkYjA/BgNVHREEODA2hjRzcGlmZmU6Ly8x\nMTExMTExMS0yMjIyLTMzMzMtNDQ0NC01NTU1NTU1NTU1NTUuY29uc3VsMAoGCCqG\nSM49BAMCA0gAMEUCICOY0i246rQHJt8o8Oya0D5PLL1FnmsQmQqIGCi31RwnAiEA\noR5f6Ku+cig2Il8T8LJujOp2/2A72QcHZA57B13y+8o=\n-----END CERTIFICATE-----\n"
}
}
},
"requireClientCertificate": true
}
}
}
],
"trafficDirection": "INBOUND"
}
],
"typeUrl": "type.googleapis.com/envoy.config.listener.v3.Listener",
"nonce": "00000001"
}

View File

@ -0,0 +1,169 @@
{
"versionInfo": "00000001",
"resources": [
{
"@type": "type.googleapis.com/envoy.api.v2.Listener",
"name": "db:127.0.0.1:9191",
"address": {
"socketAddress": {
"address": "127.0.0.1",
"portValue": 9191
}
},
"filterChains": [
{
"filters": [
{
"name": "envoy.filters.network.tcp_proxy",
"typedConfig": {
"@type": "type.googleapis.com/envoy.config.filter.network.tcp_proxy.v2.TcpProxy",
"statPrefix": "upstream.db.default.dc1",
"cluster": "db.default.dc1.internal.11111111-2222-3333-4444-555555555555.consul"
}
}
]
}
],
"trafficDirection": "OUTBOUND"
},
{
"@type": "type.googleapis.com/envoy.api.v2.Listener",
"name": "outbound_listener:127.0.0.1:15001",
"address": {
"socketAddress": {
"address": "127.0.0.1",
"portValue": 15001
}
},
"filterChains": [
{
"filterChainMatch": {
"prefixRanges": [
{
"addressPrefix": "8.8.8.8",
"prefixLen": 32
}
]
},
"filters": [
{
"name": "envoy.filters.network.tcp_proxy",
"typedConfig": {
"@type": "type.googleapis.com/envoy.config.filter.network.tcp_proxy.v2.TcpProxy",
"statPrefix": "upstream.google.default.dc1",
"cluster": "google.default.dc1.internal.11111111-2222-3333-4444-555555555555.consul"
}
}
]
},
{
"filters": [
{
"name": "envoy.filters.network.tcp_proxy",
"typedConfig": {
"@type": "type.googleapis.com/envoy.config.filter.network.tcp_proxy.v2.TcpProxy",
"statPrefix": "upstream.original-destination",
"cluster": "original-destination"
}
}
]
}
],
"listenerFilters": [
{
"name": "envoy.filters.listener.original_dst"
}
],
"trafficDirection": "OUTBOUND"
},
{
"@type": "type.googleapis.com/envoy.api.v2.Listener",
"name": "prepared_query:geo-cache:127.10.10.10:8181",
"address": {
"socketAddress": {
"address": "127.10.10.10",
"portValue": 8181
}
},
"filterChains": [
{
"filters": [
{
"name": "envoy.filters.network.tcp_proxy",
"typedConfig": {
"@type": "type.googleapis.com/envoy.config.filter.network.tcp_proxy.v2.TcpProxy",
"statPrefix": "upstream.prepared_query_geo-cache",
"cluster": "geo-cache.default.dc1.query.11111111-2222-3333-4444-555555555555.consul"
}
}
]
}
],
"trafficDirection": "OUTBOUND"
},
{
"@type": "type.googleapis.com/envoy.api.v2.Listener",
"name": "public_listener:0.0.0.0:9999",
"address": {
"socketAddress": {
"address": "0.0.0.0",
"portValue": 9999
}
},
"filterChains": [
{
"filters": [
{
"name": "envoy.filters.network.rbac",
"typedConfig": {
"@type": "type.googleapis.com/envoy.config.filter.network.rbac.v2.RBAC",
"rules": {
},
"statPrefix": "connect_authz"
}
},
{
"name": "envoy.filters.network.tcp_proxy",
"typedConfig": {
"@type": "type.googleapis.com/envoy.config.filter.network.tcp_proxy.v2.TcpProxy",
"statPrefix": "public_listener",
"cluster": "local_app"
}
}
],
"transportSocket": {
"name": "tls",
"typedConfig": {
"@type": "type.googleapis.com/envoy.api.v2.auth.DownstreamTlsContext",
"commonTlsContext": {
"tlsParams": {
},
"tlsCertificates": [
{
"certificateChain": {
"inlineString": "-----BEGIN CERTIFICATE-----\nMIICjDCCAjKgAwIBAgIIC5llxGV1gB8wCgYIKoZIzj0EAwIwFDESMBAGA1UEAxMJ\nVGVzdCBDQSAyMB4XDTE5MDMyMjEzNTgyNloXDTI5MDMyMjEzNTgyNlowDjEMMAoG\nA1UEAxMDd2ViMFkwEwYHKoZIzj0CAQYIKoZIzj0DAQcDQgAEADPv1RHVNRfa2VKR\nAB16b6rZnEt7tuhaxCFpQXPj7M2omb0B9Favq5E0ivpNtv1QnFhxtPd7d5k4e+T7\nSkW1TaOCAXIwggFuMA4GA1UdDwEB/wQEAwIDuDAdBgNVHSUEFjAUBggrBgEFBQcD\nAgYIKwYBBQUHAwEwDAYDVR0TAQH/BAIwADBoBgNVHQ4EYQRfN2Q6MDc6ODc6M2E6\nNDA6MTk6NDc6YzM6NWE6YzA6YmE6NjI6ZGY6YWY6NGI6ZDQ6MDU6MjU6NzY6M2Q6\nNWE6OGQ6MTY6OGQ6Njc6NWU6MmU6YTA6MzQ6N2Q6ZGM6ZmYwagYDVR0jBGMwYYBf\nZDE6MTE6MTE6YWM6MmE6YmE6OTc6YjI6M2Y6YWM6N2I6YmQ6ZGE6YmU6YjE6OGE6\nZmM6OWE6YmE6YjU6YmM6ODM6ZTc6NWU6NDE6NmY6ZjI6NzM6OTU6NTg6MGM6ZGIw\nWQYDVR0RBFIwUIZOc3BpZmZlOi8vMTExMTExMTEtMjIyMi0zMzMzLTQ0NDQtNTU1\nNTU1NTU1NTU1LmNvbnN1bC9ucy9kZWZhdWx0L2RjL2RjMS9zdmMvd2ViMAoGCCqG\nSM49BAMCA0gAMEUCIGC3TTvvjj76KMrguVyFf4tjOqaSCRie3nmHMRNNRav7AiEA\npY0heYeK9A6iOLrzqxSerkXXQyj5e9bE4VgUnxgPU6g=\n-----END CERTIFICATE-----\n"
},
"privateKey": {
"inlineString": "-----BEGIN EC PRIVATE KEY-----\nMHcCAQEEIMoTkpRggp3fqZzFKh82yS4LjtJI+XY+qX/7DefHFrtdoAoGCCqGSM49\nAwEHoUQDQgAEADPv1RHVNRfa2VKRAB16b6rZnEt7tuhaxCFpQXPj7M2omb0B9Fav\nq5E0ivpNtv1QnFhxtPd7d5k4e+T7SkW1TQ==\n-----END EC PRIVATE KEY-----\n"
}
}
],
"validationContext": {
"trustedCa": {
"inlineString": "-----BEGIN CERTIFICATE-----\nMIICXDCCAgKgAwIBAgIICpZq70Z9LyUwCgYIKoZIzj0EAwIwFDESMBAGA1UEAxMJ\nVGVzdCBDQSAyMB4XDTE5MDMyMjEzNTgyNloXDTI5MDMyMjEzNTgyNlowFDESMBAG\nA1UEAxMJVGVzdCBDQSAyMFkwEwYHKoZIzj0CAQYIKoZIzj0DAQcDQgAEIhywH1gx\nAsMwuF3ukAI5YL2jFxH6Usnma1HFSfVyxbXX1/uoZEYrj8yCAtdU2yoHETyd+Zx2\nThhRLP79pYegCaOCATwwggE4MA4GA1UdDwEB/wQEAwIBhjAPBgNVHRMBAf8EBTAD\nAQH/MGgGA1UdDgRhBF9kMToxMToxMTphYzoyYTpiYTo5NzpiMjozZjphYzo3Yjpi\nZDpkYTpiZTpiMTo4YTpmYzo5YTpiYTpiNTpiYzo4MzplNzo1ZTo0MTo2ZjpmMjo3\nMzo5NTo1ODowYzpkYjBqBgNVHSMEYzBhgF9kMToxMToxMTphYzoyYTpiYTo5Nzpi\nMjozZjphYzo3YjpiZDpkYTpiZTpiMTo4YTpmYzo5YTpiYTpiNTpiYzo4MzplNzo1\nZTo0MTo2ZjpmMjo3Mzo5NTo1ODowYzpkYjA/BgNVHREEODA2hjRzcGlmZmU6Ly8x\nMTExMTExMS0yMjIyLTMzMzMtNDQ0NC01NTU1NTU1NTU1NTUuY29uc3VsMAoGCCqG\nSM49BAMCA0gAMEUCICOY0i246rQHJt8o8Oya0D5PLL1FnmsQmQqIGCi31RwnAiEA\noR5f6Ku+cig2Il8T8LJujOp2/2A72QcHZA57B13y+8o=\n-----END CERTIFICATE-----\n"
}
}
},
"requireClientCertificate": true
}
}
}
],
"trafficDirection": "INBOUND"
}
],
"typeUrl": "type.googleapis.com/envoy.api.v2.Listener",
"nonce": "00000001"
}