proxycfg: Ensure that endpoints for explicit upstreams in other datacenters are watched in transparent mode (#10391)

Co-authored-by: Freddy Vallenilla <freddy@hashicorp.com>
pull/10407/head
Nitya Dhanushkodi 2021-06-15 11:00:26 -07:00 committed by GitHub
parent c8ba2d40fd
commit b8b44419a0
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 210 additions and 6 deletions

3
.changelog/10391.txt Normal file
View File

@ -0,0 +1,3 @@
```release-note:bug
proxycfg: Ensure that endpoints for explicit upstreams in other datacenters are watched in transparent mode.
```

View File

@ -380,7 +380,7 @@ func (s *state) initWatchesConnectProxy(snap *ConfigSnapshot) error {
OverrideConnectTimeout: cfg.ConnectTimeout(),
}, "discovery-chain:"+u.Identifier(), s.ch)
if err != nil {
return err
return fmt.Errorf("failed to watch discovery chain for %s: %v", u.Identifier(), err)
}
default:
@ -815,6 +815,7 @@ func (s *state) handleUpdateConnectProxy(u cache.UpdateEvent, snap *ConfigSnapsh
id: svc.String(),
name: svc.Name,
namespace: svc.NamespaceOrDefault(),
datacenter: s.source.Datacenter,
cfg: cfg,
meshGateway: meshGateway,
}
@ -826,26 +827,46 @@ func (s *state) handleUpdateConnectProxy(u cache.UpdateEvent, snap *ConfigSnapsh
// Clean up data from services that were not in the update
for sn := range snap.ConnectProxy.WatchedUpstreams {
upstream := snap.ConnectProxy.UpstreamConfig[sn]
if upstream.Datacenter != "" && upstream.Datacenter != s.source.Datacenter {
continue
}
if _, ok := seenServices[sn]; !ok {
delete(snap.ConnectProxy.WatchedUpstreams, sn)
}
}
for sn := range snap.ConnectProxy.WatchedUpstreamEndpoints {
upstream := snap.ConnectProxy.UpstreamConfig[sn]
if upstream.Datacenter != "" && upstream.Datacenter != s.source.Datacenter {
continue
}
if _, ok := seenServices[sn]; !ok {
delete(snap.ConnectProxy.WatchedUpstreamEndpoints, sn)
}
}
for sn := range snap.ConnectProxy.WatchedGateways {
upstream := snap.ConnectProxy.UpstreamConfig[sn]
if upstream.Datacenter != "" && upstream.Datacenter != s.source.Datacenter {
continue
}
if _, ok := seenServices[sn]; !ok {
delete(snap.ConnectProxy.WatchedGateways, sn)
}
}
for sn := range snap.ConnectProxy.WatchedGatewayEndpoints {
upstream := snap.ConnectProxy.UpstreamConfig[sn]
if upstream.Datacenter != "" && upstream.Datacenter != s.source.Datacenter {
continue
}
if _, ok := seenServices[sn]; !ok {
delete(snap.ConnectProxy.WatchedGatewayEndpoints, sn)
}
}
for sn, cancelFn := range snap.ConnectProxy.WatchedDiscoveryChains {
upstream := snap.ConnectProxy.UpstreamConfig[sn]
if upstream.Datacenter != "" && upstream.Datacenter != s.source.Datacenter {
continue
}
if _, ok := seenServices[sn]; !ok {
cancelFn()
delete(snap.ConnectProxy.WatchedDiscoveryChains, sn)
@ -1721,9 +1742,10 @@ func (s *state) handleUpdateIngressGateway(u cache.UpdateEvent, snap *ConfigSnap
u := makeUpstream(service)
watchOpts := discoveryChainWatchOpts{
id: u.Identifier(),
name: u.DestinationName,
namespace: u.DestinationNamespace,
id: u.Identifier(),
name: u.DestinationName,
namespace: u.DestinationNamespace,
datacenter: s.source.Datacenter,
}
err := s.watchDiscoveryChain(snap, watchOpts)
if err != nil {
@ -1781,6 +1803,7 @@ type discoveryChainWatchOpts struct {
id string
name string
namespace string
datacenter string
cfg reducedUpstreamConfig
meshGateway structs.MeshGatewayConfig
}
@ -1795,7 +1818,7 @@ func (s *state) watchDiscoveryChain(snap *ConfigSnapshot, opts discoveryChainWat
Datacenter: s.source.Datacenter,
QueryOptions: structs.QueryOptions{Token: s.token},
Name: opts.name,
EvaluateInDatacenter: s.source.Datacenter,
EvaluateInDatacenter: opts.datacenter,
EvaluateInNamespace: opts.namespace,
OverrideProtocol: opts.cfg.Protocol,
OverrideConnectTimeout: opts.cfg.ConnectTimeout(),

View File

@ -3,11 +3,12 @@ package proxycfg
import (
"context"
"fmt"
"github.com/hashicorp/consul/agent/connect"
"sync"
"testing"
"time"
"github.com/hashicorp/consul/agent/connect"
"github.com/stretchr/testify/require"
"github.com/hashicorp/consul/agent/cache"
@ -362,6 +363,10 @@ func ingressConfigWatchEvent(tlsEnabled bool) cache.UpdateEvent {
}
}
func upstreamIDForDC2(name string) string {
return fmt.Sprintf("%s?dc=dc2", name)
}
// This test is meant to exercise the various parts of the cache watching done by the state as
// well as its management of the ConfigSnapshot
//
@ -1942,6 +1947,179 @@ func TestState_WatchesAndUpdates(t *testing.T) {
},
},
},
// Receiving an empty upstreams from Intentions list shouldn't delete explicit upstream watches
"transparent-proxy-handle-update-explicit-cross-dc": {
ns: structs.NodeService{
Kind: structs.ServiceKindConnectProxy,
ID: "api-proxy",
Service: "api-proxy",
Address: "10.0.1.1",
Proxy: structs.ConnectProxyConfig{
DestinationServiceName: "api",
Mode: structs.ProxyModeTransparent,
Upstreams: structs.Upstreams{
{
CentrallyConfigured: true,
DestinationName: structs.WildcardSpecifier,
DestinationNamespace: structs.WildcardSpecifier,
Config: map[string]interface{}{
"connect_timeout_ms": 6000,
},
MeshGateway: structs.MeshGatewayConfig{Mode: structs.MeshGatewayModeRemote},
},
{
DestinationName: db.Name,
DestinationNamespace: db.NamespaceOrDefault(),
Datacenter: "dc2",
MeshGateway: structs.MeshGatewayConfig{Mode: structs.MeshGatewayModeLocal},
},
},
},
},
sourceDC: "dc1",
stages: []verificationStage{
// Empty on initialization
{
requiredWatches: map[string]verifyWatchRequest{
rootsWatchID: genVerifyRootsWatch("dc1"),
intentionUpstreamsID: genVerifyServiceSpecificRequest(intentionUpstreamsID,
"api", "", "dc1", false),
leafWatchID: genVerifyLeafWatch("api", "dc1"),
intentionsWatchID: genVerifyIntentionWatch("api", "dc1"),
meshConfigEntryID: genVerifyMeshConfigWatch("dc1"),
"discovery-chain:" + upstreamIDForDC2(db.String()): genVerifyDiscoveryChainWatch(&structs.DiscoveryChainRequest{
Name: "db",
EvaluateInDatacenter: "dc2",
EvaluateInNamespace: "default",
Datacenter: "dc1",
OverrideMeshGateway: structs.MeshGatewayConfig{Mode: structs.MeshGatewayModeLocal},
}),
},
verifySnapshot: func(t testing.TB, snap *ConfigSnapshot) {
require.False(t, snap.Valid(), "proxy without roots/leaf/intentions is not valid")
require.True(t, snap.MeshGateway.IsEmpty())
require.True(t, snap.IngressGateway.IsEmpty())
require.True(t, snap.TerminatingGateway.IsEmpty())
// Centrally configured upstream defaults should be stored so that upstreams from intentions can inherit them
require.Len(t, snap.ConnectProxy.UpstreamConfig, 2)
wc := structs.NewServiceName(structs.WildcardSpecifier, structs.WildcardEnterpriseMeta())
require.Contains(t, snap.ConnectProxy.UpstreamConfig, wc.String())
require.Contains(t, snap.ConnectProxy.UpstreamConfig, upstreamIDForDC2(db.String()))
},
},
// Valid snapshot after roots, leaf, and intentions
{
events: []cache.UpdateEvent{
rootWatchEvent(),
{
CorrelationID: leafWatchID,
Result: issuedCert,
Err: nil,
},
{
CorrelationID: intentionsWatchID,
Result: TestIntentions(),
Err: nil,
},
{
CorrelationID: meshConfigEntryID,
Result: &structs.ConfigEntryResponse{
Entry: &structs.MeshConfigEntry{
TransparentProxy: structs.TransparentProxyMeshConfig{},
},
},
Err: nil,
},
},
verifySnapshot: func(t testing.TB, snap *ConfigSnapshot) {
require.True(t, snap.Valid(), "proxy with roots/leaf/intentions is valid")
require.Equal(t, indexedRoots, snap.Roots)
require.Equal(t, issuedCert, snap.Leaf())
require.Equal(t, TestIntentions().Matches[0], snap.ConnectProxy.Intentions)
require.True(t, snap.MeshGateway.IsEmpty())
require.True(t, snap.IngressGateway.IsEmpty())
require.True(t, snap.TerminatingGateway.IsEmpty())
require.True(t, snap.ConnectProxy.MeshConfigSet)
require.NotNil(t, snap.ConnectProxy.MeshConfig)
},
},
// Discovery chain updates should be stored
{
requiredWatches: map[string]verifyWatchRequest{
"discovery-chain:" + upstreamIDForDC2(db.String()): genVerifyDiscoveryChainWatch(&structs.DiscoveryChainRequest{
Name: "db",
EvaluateInDatacenter: "dc2",
EvaluateInNamespace: "default",
Datacenter: "dc1",
OverrideMeshGateway: structs.MeshGatewayConfig{Mode: structs.MeshGatewayModeLocal},
}),
},
events: []cache.UpdateEvent{
{
CorrelationID: "discovery-chain:" + upstreamIDForDC2(db.String()),
Result: &structs.DiscoveryChainResponse{
Chain: discoverychain.TestCompileConfigEntries(t, "db", "default", "dc2", "trustdomain.consul", "dc1",
func(req *discoverychain.CompileRequest) {
req.OverrideMeshGateway.Mode = structs.MeshGatewayModeLocal
}),
},
Err: nil,
},
},
verifySnapshot: func(t testing.TB, snap *ConfigSnapshot) {
require.Len(t, snap.ConnectProxy.WatchedGateways, 1)
require.Len(t, snap.ConnectProxy.WatchedGateways[upstreamIDForDC2(db.String())], 1)
require.Len(t, snap.ConnectProxy.WatchedUpstreams, 1)
require.Len(t, snap.ConnectProxy.WatchedUpstreams[upstreamIDForDC2(db.String())], 1)
},
},
// Empty list of upstreams should only clean up implicit upstreams. The explicit upstream db should not
// be deleted from the snapshot.
{
requiredWatches: map[string]verifyWatchRequest{
rootsWatchID: genVerifyRootsWatch("dc1"),
intentionUpstreamsID: genVerifyServiceSpecificRequest(intentionUpstreamsID,
"api", "", "dc1", false),
leafWatchID: genVerifyLeafWatch("api", "dc1"),
intentionsWatchID: genVerifyIntentionWatch("api", "dc1"),
"discovery-chain:" + upstreamIDForDC2(db.String()): genVerifyDiscoveryChainWatch(&structs.DiscoveryChainRequest{
Name: "db",
EvaluateInDatacenter: "dc2",
EvaluateInNamespace: "default",
Datacenter: "dc1",
OverrideMeshGateway: structs.MeshGatewayConfig{Mode: structs.MeshGatewayModeLocal},
}),
},
events: []cache.UpdateEvent{
{
CorrelationID: intentionUpstreamsID,
Result: &structs.IndexedServiceList{
Services: structs.ServiceList{},
},
Err: nil,
},
},
verifySnapshot: func(t testing.TB, snap *ConfigSnapshot) {
require.True(t, snap.Valid(), "should still be valid")
// Explicit upstream discovery chain watches don't get stored in these maps because they don't
// get canceled unless the proxy registration is modified.
require.Empty(t, snap.ConnectProxy.WatchedDiscoveryChains)
// Explicit upstreams should not be deleted when the empty update event happens since that is
// for intention upstreams.
require.Len(t, snap.ConnectProxy.DiscoveryChain, 1)
require.Contains(t, snap.ConnectProxy.DiscoveryChain, upstreamIDForDC2(db.String()))
require.Len(t, snap.ConnectProxy.WatchedGateways, 1)
require.Len(t, snap.ConnectProxy.WatchedGateways[upstreamIDForDC2(db.String())], 1)
require.Len(t, snap.ConnectProxy.WatchedUpstreams, 1)
require.Len(t, snap.ConnectProxy.WatchedUpstreams[upstreamIDForDC2(db.String())], 1)
},
},
},
},
"connect-proxy": newConnectProxyCase(structs.MeshGatewayModeDefault),
"connect-proxy-mesh-gateway-local": newConnectProxyCase(structs.MeshGatewayModeLocal),
}