mirror of https://github.com/hashicorp/consul
Fixup discovery chain handling in transparent mode (#10168)
Co-authored-by: R.B. Boyer <4903+rboyer@users.noreply.github.com> Previously we would associate the address of a discovery chain target with the discovery chain's filter chain. This was broken for a few reasons: - If the upstream is a virtual service, the client proxy has no way of dialing it because virtual services are not targets of their discovery chains. The targets are distinct services. This is addressed by watching the endpoints of all upstream services, not just their discovery chain targets. - If multiple discovery chains resolve to the same target, that would lead to multiple filter chains attempting to match on the target's virtual IP. This is addressed by only matching on the upstream's virtual IP. NOTE: this implementation requires an intention to the redirecting virtual service and not just to the final destination. This is how we can know that the virtual service is an upstream to watch. A later PR will look into traversing discovery chains when computing upstreams so that intentions are only required to the discovery chain targets.pull/10161/head
parent
3ad754ca7b
commit
ed1082510d
|
@ -231,26 +231,6 @@ func (s *state) watchMeshGateway(ctx context.Context, dc string, upstreamID stri
|
|||
}, "mesh-gateway:"+dc+":"+upstreamID, s.ch)
|
||||
}
|
||||
|
||||
func (s *state) watchConnectProxyService(ctx context.Context, correlationId string, service string, dc string, filter string, entMeta *structs.EnterpriseMeta) error {
|
||||
var finalMeta structs.EnterpriseMeta
|
||||
finalMeta.Merge(entMeta)
|
||||
|
||||
return s.health.Notify(ctx, structs.ServiceSpecificRequest{
|
||||
Datacenter: dc,
|
||||
QueryOptions: structs.QueryOptions{
|
||||
Token: s.token,
|
||||
Filter: filter,
|
||||
},
|
||||
ServiceName: service,
|
||||
Connect: true,
|
||||
// Note that Identifier doesn't type-prefix for service any more as it's
|
||||
// the default and makes metrics and other things much cleaner. It's
|
||||
// simpler for us if we have the type to make things unambiguous.
|
||||
Source: *s.source,
|
||||
EnterpriseMeta: finalMeta,
|
||||
}, correlationId, s.ch)
|
||||
}
|
||||
|
||||
// initWatchesConnectProxy sets up the watches needed based on current proxy registration
|
||||
// state.
|
||||
func (s *state) initWatchesConnectProxy(snap *ConfigSnapshot) error {
|
||||
|
@ -1019,14 +999,29 @@ func (s *state) resetWatchesFromChain(
|
|||
cancelFn()
|
||||
}
|
||||
|
||||
needGateways := make(map[string]struct{})
|
||||
var (
|
||||
watchedChainEndpoints bool
|
||||
needGateways = make(map[string]struct{})
|
||||
)
|
||||
|
||||
chainID := chain.ID()
|
||||
for _, target := range chain.Targets {
|
||||
s.logger.Trace("initializing watch of target",
|
||||
"upstream", id,
|
||||
"chain", chain.ServiceName,
|
||||
"target", target.ID,
|
||||
"mesh-gateway-mode", target.MeshGateway.Mode,
|
||||
)
|
||||
if target.ID == chainID {
|
||||
watchedChainEndpoints = true
|
||||
}
|
||||
|
||||
opts := targetWatchOpts{
|
||||
upstreamID: id,
|
||||
chainID: target.ID,
|
||||
service: target.Service,
|
||||
filter: target.Subset.Filter,
|
||||
datacenter: target.Datacenter,
|
||||
entMeta: target.GetEnterpriseMetadata(),
|
||||
}
|
||||
err := s.watchUpstreamTarget(snap, opts)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to watch target %q for upstream %q", target.ID, id)
|
||||
}
|
||||
|
||||
// We'll get endpoints from the gateway query, but the health still has
|
||||
// to come from the backing service query.
|
||||
|
@ -1036,22 +1031,31 @@ func (s *state) resetWatchesFromChain(
|
|||
case structs.MeshGatewayModeLocal:
|
||||
needGateways[s.source.Datacenter] = struct{}{}
|
||||
}
|
||||
}
|
||||
|
||||
ctx, cancel := context.WithCancel(s.ctx)
|
||||
err := s.watchConnectProxyService(
|
||||
ctx,
|
||||
"upstream-target:"+target.ID+":"+id,
|
||||
target.Service,
|
||||
target.Datacenter,
|
||||
target.Subset.Filter,
|
||||
target.GetEnterpriseMetadata(),
|
||||
)
|
||||
if err != nil {
|
||||
cancel()
|
||||
return err
|
||||
// If the discovery chain's targets do not lead to watching all endpoints
|
||||
// for the upstream, then create a separate watch for those too.
|
||||
// This is needed in transparent mode because if there is some service A that
|
||||
// redirects to service B, the dialing proxy needs to associate A's virtual IP
|
||||
// with A's discovery chain.
|
||||
//
|
||||
// Outside of transparent mode we only watch the chain target, B,
|
||||
// since A is a virtual service and traffic will not be sent to it.
|
||||
if !watchedChainEndpoints && s.proxyCfg.Mode == structs.ProxyModeTransparent {
|
||||
chainEntMeta := structs.NewEnterpriseMeta(chain.Namespace)
|
||||
|
||||
opts := targetWatchOpts{
|
||||
upstreamID: id,
|
||||
chainID: chainID,
|
||||
service: chain.ServiceName,
|
||||
filter: "",
|
||||
datacenter: chain.Datacenter,
|
||||
entMeta: &chainEntMeta,
|
||||
}
|
||||
err := s.watchUpstreamTarget(snap, opts)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to watch target %q for upstream %q", chainID, id)
|
||||
}
|
||||
|
||||
snap.WatchedUpstreams[id][target.ID] = cancel
|
||||
}
|
||||
|
||||
for dc := range needGateways {
|
||||
|
@ -1092,6 +1096,52 @@ func (s *state) resetWatchesFromChain(
|
|||
return nil
|
||||
}
|
||||
|
||||
type targetWatchOpts struct {
|
||||
upstreamID string
|
||||
chainID string
|
||||
service string
|
||||
filter string
|
||||
datacenter string
|
||||
entMeta *structs.EnterpriseMeta
|
||||
}
|
||||
|
||||
func (s *state) watchUpstreamTarget(snap *ConfigSnapshotUpstreams, opts targetWatchOpts) error {
|
||||
s.logger.Trace("initializing watch of target",
|
||||
"upstream", opts.upstreamID,
|
||||
"chain", opts.service,
|
||||
"target", opts.chainID,
|
||||
)
|
||||
|
||||
var finalMeta structs.EnterpriseMeta
|
||||
finalMeta.Merge(opts.entMeta)
|
||||
|
||||
correlationID := "upstream-target:" + opts.chainID + ":" + opts.upstreamID
|
||||
|
||||
ctx, cancel := context.WithCancel(s.ctx)
|
||||
err := s.health.Notify(ctx, structs.ServiceSpecificRequest{
|
||||
Datacenter: opts.datacenter,
|
||||
QueryOptions: structs.QueryOptions{
|
||||
Token: s.token,
|
||||
Filter: opts.filter,
|
||||
},
|
||||
ServiceName: opts.service,
|
||||
Connect: true,
|
||||
// Note that Identifier doesn't type-prefix for service any more as it's
|
||||
// the default and makes metrics and other things much cleaner. It's
|
||||
// simpler for us if we have the type to make things unambiguous.
|
||||
Source: *s.source,
|
||||
EnterpriseMeta: finalMeta,
|
||||
}, correlationID, s.ch)
|
||||
|
||||
if err != nil {
|
||||
cancel()
|
||||
return err
|
||||
}
|
||||
snap.WatchedUpstreams[opts.upstreamID][opts.chainID] = cancel
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s *state) handleUpdateTerminatingGateway(u cache.UpdateEvent, snap *ConfigSnapshot) error {
|
||||
if u.Err != nil {
|
||||
return fmt.Errorf("error filling agent cache: %v", u.Err)
|
||||
|
|
|
@ -1802,6 +1802,45 @@ func TestState_WatchesAndUpdates(t *testing.T) {
|
|||
)
|
||||
},
|
||||
},
|
||||
// Discovery chain updates should be stored
|
||||
{
|
||||
requiredWatches: map[string]verifyWatchRequest{
|
||||
"discovery-chain:" + db.String(): genVerifyDiscoveryChainWatch(&structs.DiscoveryChainRequest{
|
||||
Name: "db",
|
||||
EvaluateInDatacenter: "dc1",
|
||||
EvaluateInNamespace: "default",
|
||||
Datacenter: "dc1",
|
||||
OverrideConnectTimeout: 6 * time.Second,
|
||||
OverrideMeshGateway: structs.MeshGatewayConfig{Mode: structs.MeshGatewayModeRemote},
|
||||
}),
|
||||
},
|
||||
events: []cache.UpdateEvent{
|
||||
{
|
||||
CorrelationID: "discovery-chain:db",
|
||||
Result: &structs.DiscoveryChainResponse{
|
||||
Chain: discoverychain.TestCompileConfigEntries(t, "db", "default", "dc1", "trustdomain.consul", "dc1", nil,
|
||||
&structs.ServiceResolverConfigEntry{
|
||||
Kind: structs.ServiceResolver,
|
||||
Name: "db",
|
||||
Redirect: &structs.ServiceResolverRedirect{
|
||||
Service: "mysql",
|
||||
},
|
||||
},
|
||||
),
|
||||
},
|
||||
Err: nil,
|
||||
},
|
||||
},
|
||||
verifySnapshot: func(t testing.TB, snap *ConfigSnapshot) {
|
||||
require.Len(t, snap.ConnectProxy.WatchedUpstreams, 1)
|
||||
require.Len(t, snap.ConnectProxy.WatchedUpstreams[db.String()], 2)
|
||||
|
||||
// In transparent mode we watch the upstream's endpoints even if the upstream is not a target of its chain.
|
||||
// This will happen in cases like redirects.
|
||||
require.Contains(t, snap.ConnectProxy.WatchedUpstreams[db.String()], "db.default.dc1")
|
||||
require.Contains(t, snap.ConnectProxy.WatchedUpstreams[db.String()], "mysql.default.dc1")
|
||||
},
|
||||
},
|
||||
// Empty list of upstreams should clean everything up
|
||||
{
|
||||
requiredWatches: map[string]verifyWatchRequest{
|
||||
|
|
|
@ -88,6 +88,12 @@ func (c *CompiledDiscoveryChain) IsDefault() bool {
|
|||
return target.Service == c.ServiceName && target.Namespace == c.Namespace
|
||||
}
|
||||
|
||||
// ID returns an ID that encodes the service, namespace, and datacenter.
|
||||
// This ID allows us to compare a discovery chain target to the chain upstream itself.
|
||||
func (c *CompiledDiscoveryChain) ID() string {
|
||||
return chainID("", c.ServiceName, c.Namespace, c.Datacenter)
|
||||
}
|
||||
|
||||
const (
|
||||
DiscoveryGraphNodeTypeRouter = "router"
|
||||
DiscoveryGraphNodeTypeSplitter = "splitter"
|
||||
|
@ -229,13 +235,16 @@ func NewDiscoveryTarget(service, serviceSubset, namespace, datacenter string) *D
|
|||
return t
|
||||
}
|
||||
|
||||
func (t *DiscoveryTarget) setID() {
|
||||
func chainID(subset, service, namespace, dc string) string {
|
||||
// NOTE: this format is similar to the SNI syntax for simplicity
|
||||
if t.ServiceSubset == "" {
|
||||
t.ID = fmt.Sprintf("%s.%s.%s", t.Service, t.Namespace, t.Datacenter)
|
||||
} else {
|
||||
t.ID = fmt.Sprintf("%s.%s.%s.%s", t.ServiceSubset, t.Service, t.Namespace, t.Datacenter)
|
||||
if subset == "" {
|
||||
return fmt.Sprintf("%s.%s.%s", service, namespace, dc)
|
||||
}
|
||||
return fmt.Sprintf("%s.%s.%s.%s", subset, service, namespace, dc)
|
||||
}
|
||||
|
||||
func (t *DiscoveryTarget) setID() {
|
||||
t.ID = chainID(t.ServiceSubset, t.Service, t.Namespace, t.Datacenter)
|
||||
}
|
||||
|
||||
func (t *DiscoveryTarget) String() string {
|
||||
|
|
|
@ -133,6 +133,9 @@ func (s *ResourceGenerator) listenersFromSnapshotConnectProxy(cfgSnap *proxycfg.
|
|||
continue
|
||||
}
|
||||
|
||||
// The rest of this loop is used exclusively for transparent proxies.
|
||||
// Below we create a filter chain per upstream, rather than a listener per upstream
|
||||
// as we do for explicit upstreams above.
|
||||
filterChain, err := s.makeUpstreamFilterChainForDiscoveryChain(
|
||||
id,
|
||||
"",
|
||||
|
@ -146,19 +149,21 @@ func (s *ResourceGenerator) listenersFromSnapshotConnectProxy(cfgSnap *proxycfg.
|
|||
return nil, err
|
||||
}
|
||||
|
||||
endpoints := cfgSnap.ConnectProxy.WatchedUpstreamEndpoints[id]
|
||||
endpoints := cfgSnap.ConnectProxy.WatchedUpstreamEndpoints[id][chain.ID()]
|
||||
uniqueAddrs := make(map[string]struct{})
|
||||
|
||||
for _, t := range chain.Targets {
|
||||
// Match on the virtual IP for the upstream service.
|
||||
// We do not match on all endpoints here since it would lead to load balancing across
|
||||
// all instances when any instance address is dialed.
|
||||
for _, e := range endpoints[t.ID] {
|
||||
if vip := e.Service.TaggedAddresses[virtualIPTag]; vip.Address != "" {
|
||||
uniqueAddrs[vip.Address] = struct{}{}
|
||||
}
|
||||
// Match on the virtual IP for the upstream service (identified by the chain's ID).
|
||||
// We do not match on all endpoints here since it would lead to load balancing across
|
||||
// all instances when any instance address is dialed.
|
||||
for _, e := range endpoints {
|
||||
if vip := e.Service.TaggedAddresses[virtualIPTag]; vip.Address != "" {
|
||||
uniqueAddrs[vip.Address] = struct{}{}
|
||||
}
|
||||
}
|
||||
if len(uniqueAddrs) > 1 {
|
||||
s.Logger.Warn("detected multiple virtual IPs for an upstream, all will be used to match traffic",
|
||||
"upstream", id)
|
||||
}
|
||||
|
||||
// For every potential address we collected, create the appropriate address prefix to match on.
|
||||
// In this case we are matching on exact addresses, so the prefix is the address itself,
|
||||
|
|
|
@ -508,6 +508,22 @@ func TestListenersFromSnapshot(t *testing.T) {
|
|||
},
|
||||
},
|
||||
},
|
||||
// Other targets of the discovery chain should be ignored.
|
||||
// We only match on the upstream's virtual IP, not the IPs of other targets.
|
||||
"google-v2.default.dc1": {
|
||||
structs.CheckServiceNode{
|
||||
Node: &structs.Node{
|
||||
Address: "7.7.7.7",
|
||||
Datacenter: "dc1",
|
||||
},
|
||||
Service: &structs.NodeService{
|
||||
Service: "google-v2",
|
||||
TaggedAddresses: map[string]structs.ServiceAddress{
|
||||
"virtual": {Address: "10.10.10.10"},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
// DiscoveryChains without endpoints do not get a filter chain because there are no addresses to match on.
|
||||
|
|
Loading…
Reference in New Issue