Merge pull request #9976 from hashicorp/centralized-upstream-fixups

pull/9910/head
Freddy 4 years ago committed by GitHub
commit a02245b75a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

@ -410,13 +410,29 @@ func (c *ConfigEntry) ResolveServiceConfig(args *structs.ServiceConfigRequest, r
upstreamIDs := args.UpstreamIDs
legacyUpstreams := false
// Before Consul namespaces were released, the Upstreams provided to the endpoint did not contain the namespace.
// Because of this we attach the enterprise meta of the request, which will just be the default namespace.
if len(upstreamIDs) == 0 {
var (
noUpstreamArgs = len(upstreamIDs) == 0 && len(args.Upstreams) == 0
// Check the args and the resolved value. If it was exclusively set via a config entry, then args.TransparentProxy
// will never be true because the service config request does not use the resolved value.
tproxy = args.TransparentProxy || reply.TransparentProxy
)
// The upstreams passed as arguments to this endpoint are the upstreams explicitly defined in a proxy registration.
// If no upstreams were passed, then we should only returned the resolved config if the proxy has TransparentProxy mode enabled.
// Otherwise we would return a resolved upstream config to a proxy with no configured upstreams.
if noUpstreamArgs && !tproxy {
return nil
}
// The request is considered legacy if the deprecated args.Upstream was used
if len(upstreamIDs) == 0 && len(args.Upstreams) > 0 {
legacyUpstreams = true
upstreamIDs = make([]structs.ServiceID, 0)
for _, upstream := range args.Upstreams {
// Before Consul namespaces were released, the Upstreams provided to the endpoint did not contain the namespace.
// Because of this we attach the enterprise meta of the request, which will just be the default namespace.
sid := structs.NewServiceID(upstream, &args.EnterpriseMeta)
upstreamIDs = append(upstreamIDs, sid)
}
@ -436,6 +452,9 @@ func (c *ConfigEntry) ResolveServiceConfig(args *structs.ServiceConfigRequest, r
}
}
// usConfigs stores the opaque config map for each upstream and is keyed on the upstream's ID.
usConfigs := make(map[structs.ServiceID]map[string]interface{})
var (
upstreamDefaults *structs.UpstreamConfig
upstreamConfigs map[string]*structs.UpstreamConfig
@ -443,15 +462,20 @@ func (c *ConfigEntry) ResolveServiceConfig(args *structs.ServiceConfigRequest, r
if serviceConf != nil && serviceConf.Connect != nil {
if serviceConf.Connect.UpstreamDefaults != nil {
upstreamDefaults = serviceConf.Connect.UpstreamDefaults
// Store the upstream defaults under a wildcard key so that they can be applied to
// upstreams that are inferred from intentions and do not have explicit upstream configuration.
cfgMap := make(map[string]interface{})
upstreamDefaults.MergeInto(cfgMap)
wildcard := structs.NewServiceID(structs.WildcardSpecifier, structs.WildcardEnterpriseMeta())
usConfigs[wildcard] = cfgMap
}
if serviceConf.Connect.UpstreamConfigs != nil {
upstreamConfigs = serviceConf.Connect.UpstreamConfigs
}
}
// usConfigs stores the opaque config map for each upstream and is keyed on the upstream's ID.
usConfigs := make(map[structs.ServiceID]map[string]interface{})
for upstream := range seenUpstreams {
resolvedCfg := make(map[string]interface{})

@ -1000,6 +1000,7 @@ func TestConfigEntry_ResolveServiceConfig_Upstreams(t *testing.T) {
mysql := structs.NewServiceID("mysql", structs.DefaultEnterpriseMeta())
cache := structs.NewServiceID("cache", structs.DefaultEnterpriseMeta())
wildcard := structs.NewServiceID(structs.WildcardSpecifier, structs.WildcardEnterpriseMeta())
tt := []struct {
name string
@ -1126,6 +1127,14 @@ func TestConfigEntry_ResolveServiceConfig_Upstreams(t *testing.T) {
},
expect: structs.ServiceConfigResponse{
UpstreamIDConfigs: structs.OpaqueUpstreamConfigs{
{
Upstream: wildcard,
Config: map[string]interface{}{
"mesh_gateway": map[string]interface{}{
"Mode": "remote",
},
},
},
{
Upstream: mysql,
Config: map[string]interface{}{
@ -1188,6 +1197,19 @@ func TestConfigEntry_ResolveServiceConfig_Upstreams(t *testing.T) {
"protocol": "udp",
},
UpstreamIDConfigs: structs.OpaqueUpstreamConfigs{
{
Upstream: wildcard,
Config: map[string]interface{}{
"passive_health_check": map[string]interface{}{
"Interval": int64(10),
"MaxFailures": int64(2),
},
"mesh_gateway": map[string]interface{}{
"Mode": "remote",
},
"protocol": "http",
},
},
{
Upstream: mysql,
Config: map[string]interface{}{
@ -1204,6 +1226,130 @@ func TestConfigEntry_ResolveServiceConfig_Upstreams(t *testing.T) {
},
},
},
{
name: "without upstream args we should return centralized config with tproxy arg",
entries: []structs.ConfigEntry{
&structs.ServiceConfigEntry{
Kind: structs.ServiceDefaults,
Name: "api",
Connect: &structs.ConnectConfiguration{
UpstreamDefaults: &structs.UpstreamConfig{
MeshGateway: structs.MeshGatewayConfig{Mode: structs.MeshGatewayModeRemote},
},
UpstreamConfigs: map[string]*structs.UpstreamConfig{
mysql.String(): {
Protocol: "grpc",
},
},
},
},
},
request: structs.ServiceConfigRequest{
Name: "api",
Datacenter: "dc1",
TransparentProxy: true,
// Empty Upstreams/UpstreamIDs
},
expect: structs.ServiceConfigResponse{
UpstreamIDConfigs: structs.OpaqueUpstreamConfigs{
{
Upstream: wildcard,
Config: map[string]interface{}{
"mesh_gateway": map[string]interface{}{
"Mode": "remote",
},
},
},
{
Upstream: mysql,
Config: map[string]interface{}{
"protocol": "grpc",
"mesh_gateway": map[string]interface{}{
"Mode": "remote",
},
},
},
},
},
},
{
name: "without upstream args we should return centralized config with tproxy default",
entries: []structs.ConfigEntry{
&structs.ServiceConfigEntry{
Kind: structs.ServiceDefaults,
Name: "api",
Connect: &structs.ConnectConfiguration{
UpstreamDefaults: &structs.UpstreamConfig{
MeshGateway: structs.MeshGatewayConfig{Mode: structs.MeshGatewayModeRemote},
},
UpstreamConfigs: map[string]*structs.UpstreamConfig{
mysql.String(): {
Protocol: "grpc",
},
},
},
// TransparentProxy on the config entry but not the config request
TransparentProxy: true,
},
},
request: structs.ServiceConfigRequest{
Name: "api",
Datacenter: "dc1",
// Empty Upstreams/UpstreamIDs
},
expect: structs.ServiceConfigResponse{
TransparentProxy: true,
UpstreamIDConfigs: structs.OpaqueUpstreamConfigs{
{
Upstream: wildcard,
Config: map[string]interface{}{
"mesh_gateway": map[string]interface{}{
"Mode": "remote",
},
},
},
{
Upstream: mysql,
Config: map[string]interface{}{
"protocol": "grpc",
"mesh_gateway": map[string]interface{}{
"Mode": "remote",
},
},
},
},
},
},
{
name: "without upstream args we should NOT return centralized config outside tproxy mode",
entries: []structs.ConfigEntry{
&structs.ServiceConfigEntry{
Kind: structs.ServiceDefaults,
Name: "api",
Connect: &structs.ConnectConfiguration{
UpstreamDefaults: &structs.UpstreamConfig{
MeshGateway: structs.MeshGatewayConfig{Mode: structs.MeshGatewayModeRemote},
},
UpstreamConfigs: map[string]*structs.UpstreamConfig{
mysql.String(): {
Protocol: "grpc",
},
},
},
},
},
request: structs.ServiceConfigRequest{
Name: "api",
Datacenter: "dc1",
TransparentProxy: false,
// Empty Upstreams/UpstreamIDs
},
expect: structs.ServiceConfigResponse{},
},
}
for _, tc := range tt {

@ -333,6 +333,12 @@ func (s *state) initWatchesConnectProxy(snap *ConfigSnapshot) error {
for i := range s.proxyCfg.Upstreams {
u := s.proxyCfg.Upstreams[i]
// Store defaults keyed under wildcard so they can be applied to centrally configured upstreams
if u.DestinationName == structs.WildcardSpecifier {
snap.ConnectProxy.UpstreamConfig[u.DestinationID().String()] = &u
continue
}
// This can be true if the upstream is a synthetic entry populated from centralized upstream config.
// Watches should not be created for them.
if u.CentrallyConfigured {
@ -795,6 +801,17 @@ func (s *state) handleUpdateConnectProxy(u cache.UpdateEvent, snap *ConfigSnapsh
u, ok := snap.ConnectProxy.UpstreamConfig[svc.String()]
if ok {
cfgMap = u.Config
} else {
// Use the centralized upstream defaults if they exist and there isn't specific configuration for this upstream
// This is only relevant to upstreams from intentions because for explicit upstreams the defaulting is handled
// by the ResolveServiceConfig endpoint.
wildcardSID := structs.NewServiceID(structs.WildcardSpecifier, structs.WildcardEnterpriseMeta())
defaults, ok := snap.ConnectProxy.UpstreamConfig[wildcardSID.String()]
if ok {
u = defaults
cfgMap = defaults.Config
snap.ConnectProxy.UpstreamConfig[svc.String()] = defaults
}
}
cfg, err := parseReducedUpstreamConfig(cfgMap)
@ -808,7 +825,18 @@ func (s *state) handleUpdateConnectProxy(u cache.UpdateEvent, snap *ConfigSnapsh
)
}
err = s.watchDiscoveryChain(snap, cfg, svc.String(), svc.Name, svc.NamespaceOrDefault())
meshGateway := s.proxyCfg.MeshGateway
if u != nil {
meshGateway = meshGateway.OverlayWith(u.MeshGateway)
}
watchOpts := discoveryChainWatchOpts{
id: svc.String(),
name: svc.Name,
namespace: svc.NamespaceOrDefault(),
cfg: cfg,
meshGateway: meshGateway,
}
err = s.watchDiscoveryChain(snap, watchOpts)
if err != nil {
return fmt.Errorf("failed to watch discovery chain for %s: %v", svc.String(), err)
}
@ -1592,7 +1620,12 @@ func (s *state) handleUpdateIngressGateway(u cache.UpdateEvent, snap *ConfigSnap
for _, service := range services.Services {
u := makeUpstream(service)
err := s.watchDiscoveryChain(snap, reducedUpstreamConfig{}, u.Identifier(), u.DestinationName, u.DestinationNamespace)
watchOpts := discoveryChainWatchOpts{
id: u.Identifier(),
name: u.DestinationName,
namespace: u.DestinationNamespace,
}
err := s.watchDiscoveryChain(snap, watchOpts)
if err != nil {
return fmt.Errorf("failed to watch discovery chain for %s: %v", u.Identifier(), err)
}
@ -1642,8 +1675,16 @@ func makeUpstream(g *structs.GatewayService) structs.Upstream {
return upstream
}
func (s *state) watchDiscoveryChain(snap *ConfigSnapshot, cfg reducedUpstreamConfig, id, name, namespace string) error {
if _, ok := snap.ConnectProxy.WatchedDiscoveryChains[id]; ok {
type discoveryChainWatchOpts struct {
id string
name string
namespace string
cfg reducedUpstreamConfig
meshGateway structs.MeshGatewayConfig
}
func (s *state) watchDiscoveryChain(snap *ConfigSnapshot, opts discoveryChainWatchOpts) error {
if _, ok := snap.ConnectProxy.WatchedDiscoveryChains[opts.id]; ok {
return nil
}
@ -1651,12 +1692,13 @@ func (s *state) watchDiscoveryChain(snap *ConfigSnapshot, cfg reducedUpstreamCon
err := s.cache.Notify(ctx, cachetype.CompiledDiscoveryChainName, &structs.DiscoveryChainRequest{
Datacenter: s.source.Datacenter,
QueryOptions: structs.QueryOptions{Token: s.token},
Name: name,
Name: opts.name,
EvaluateInDatacenter: s.source.Datacenter,
EvaluateInNamespace: namespace,
OverrideProtocol: cfg.Protocol,
OverrideConnectTimeout: cfg.ConnectTimeout(),
}, "discovery-chain:"+id, s.ch)
EvaluateInNamespace: opts.namespace,
OverrideProtocol: opts.cfg.Protocol,
OverrideConnectTimeout: opts.cfg.ConnectTimeout(),
OverrideMeshGateway: opts.meshGateway,
}, "discovery-chain:"+opts.id, s.ch)
if err != nil {
cancel()
return err
@ -1664,9 +1706,9 @@ func (s *state) watchDiscoveryChain(snap *ConfigSnapshot, cfg reducedUpstreamCon
switch s.kind {
case structs.ServiceKindIngressGateway:
snap.IngressGateway.WatchedDiscoveryChains[id] = cancel
snap.IngressGateway.WatchedDiscoveryChains[opts.id] = cancel
case structs.ServiceKindConnectProxy:
snap.ConnectProxy.WatchedDiscoveryChains[id] = cancel
snap.ConnectProxy.WatchedDiscoveryChains[opts.id] = cancel
default:
cancel()
return fmt.Errorf("unsupported kind %s", s.kind)

@ -5,6 +5,7 @@ import (
"fmt"
"sync"
"testing"
"time"
"github.com/stretchr/testify/require"
@ -1606,6 +1607,17 @@ func TestState_WatchesAndUpdates(t *testing.T) {
Proxy: structs.ConnectProxyConfig{
DestinationServiceName: "api",
TransparentProxy: true,
Upstreams: structs.Upstreams{
{
CentrallyConfigured: true,
DestinationName: structs.WildcardSpecifier,
DestinationNamespace: structs.WildcardSpecifier,
Config: map[string]interface{}{
"connect_timeout_ms": 6000,
},
MeshGateway: structs.MeshGatewayConfig{Mode: structs.MeshGatewayModeRemote},
},
},
},
},
sourceDC: "dc1",
@ -1622,10 +1634,13 @@ func TestState_WatchesAndUpdates(t *testing.T) {
},
verifySnapshot: func(t testing.TB, snap *ConfigSnapshot) {
require.False(t, snap.Valid(), "proxy without roots/leaf/intentions is not valid")
require.True(t, snap.ConnectProxy.IsEmpty())
require.True(t, snap.MeshGateway.IsEmpty())
require.True(t, snap.IngressGateway.IsEmpty())
require.True(t, snap.TerminatingGateway.IsEmpty())
// Centrally configured upstream defaults should be stored so that upstreams from intentions can inherit them
require.Len(t, snap.ConnectProxy.UpstreamConfig, 1)
require.Contains(t, snap.ConnectProxy.UpstreamConfig, "*")
},
},
// Valid snapshot after roots, leaf, and intentions
@ -1694,16 +1709,25 @@ func TestState_WatchesAndUpdates(t *testing.T) {
// Should not have results yet
require.Empty(t, snap.ConnectProxy.DiscoveryChain)
require.Len(t, snap.ConnectProxy.UpstreamConfig, 2)
cfg, ok := snap.ConnectProxy.UpstreamConfig[db.String()]
require.True(t, ok)
// Upstream config should have been inherited from defaults under wildcard key
require.Equal(t, cfg.Config["connect_timeout_ms"], 6000)
},
},
// Discovery chain updates should be stored
{
requiredWatches: map[string]verifyWatchRequest{
"discovery-chain:" + dbStr: genVerifyDiscoveryChainWatch(&structs.DiscoveryChainRequest{
Name: "db",
EvaluateInDatacenter: "dc1",
EvaluateInNamespace: "default",
Datacenter: "dc1",
Name: "db",
EvaluateInDatacenter: "dc1",
EvaluateInNamespace: "default",
Datacenter: "dc1",
OverrideConnectTimeout: 6 * time.Second,
OverrideMeshGateway: structs.MeshGatewayConfig{Mode: structs.MeshGatewayModeRemote},
}),
},
events: []cache.UpdateEvent{

@ -250,7 +250,7 @@ func (w *serviceConfigWatch) runWatch(ctx context.Context, wg *sync.WaitGroup, u
}
}
// handleUpdate receives an update event the global config defaults, updates
// handleUpdate receives an update event from the global config defaults, updates
// the local state and re-registers the service with the newly merged config.
//
// NOTE: the caller must NOT hold the Agent.stateLock!
@ -335,12 +335,13 @@ func makeConfigRequest(bd BaseDeps, addReq AddServiceRequest) *structs.ServiceCo
}
req := &structs.ServiceConfigRequest{
Name: name,
Datacenter: bd.RuntimeConfig.Datacenter,
QueryOptions: structs.QueryOptions{Token: addReq.token},
MeshGateway: ns.Proxy.MeshGateway,
UpstreamIDs: upstreams,
EnterpriseMeta: ns.EnterpriseMeta,
Name: name,
Datacenter: bd.RuntimeConfig.Datacenter,
QueryOptions: structs.QueryOptions{Token: addReq.token},
MeshGateway: ns.Proxy.MeshGateway,
TransparentProxy: ns.Proxy.TransparentProxy,
UpstreamIDs: upstreams,
EnterpriseMeta: ns.EnterpriseMeta,
}
if req.QueryOptions.Token == "" {
req.QueryOptions.Token = bd.Tokens.AgentToken()
@ -436,8 +437,8 @@ func mergeServiceConfig(defaults *structs.ServiceConfigResponse, service *struct
}
// Ensure upstreams present in central config are represented in the local configuration.
// This does not apply outside of TransparentProxy mode because in that situation every upstream needs to be defined
// explicitly and locally with a local bind port.
// This does not apply outside of TransparentProxy mode because in that situation every possible upstream already exists
// inside of ns.Proxy.Upstreams.
if ns.Proxy.TransparentProxy {
for id, remote := range remoteUpstreams {
if _, ok := localUpstreams[id]; ok {

@ -194,7 +194,9 @@ func (cfg *ConnectConfiguration) Normalize() {
v.Normalize()
}
cfg.UpstreamDefaults.Normalize()
if cfg.UpstreamDefaults != nil {
cfg.UpstreamDefaults.Normalize()
}
}
func (cfg ConnectConfiguration) Validate() error {
@ -206,8 +208,11 @@ func (cfg ConnectConfiguration) Validate() error {
}
}
if err := cfg.UpstreamDefaults.Validate(); err != nil {
validationErr = multierror.Append(validationErr, fmt.Errorf("error in upstream defaults %v", err))
if cfg.UpstreamDefaults != nil {
err := cfg.UpstreamDefaults.Validate()
if err != nil {
validationErr = multierror.Append(validationErr, fmt.Errorf("error in upstream defaults %v", err))
}
}
return validationErr
@ -590,6 +595,9 @@ type ServiceConfigRequest struct {
// MeshGateway contains the mesh gateway configuration from the requesting proxy's registration
MeshGateway MeshGatewayConfig
// TransparentProxy indicates whether the requesting proxy is in transparent proxy mode
TransparentProxy bool
UpstreamIDs []ServiceID
// DEPRECATED

@ -333,6 +333,9 @@ func (u *Upstream) Validate() error {
if u.DestinationName == "" {
return fmt.Errorf("upstream destination name cannot be empty")
}
if u.DestinationName == WildcardSpecifier && !u.CentrallyConfigured {
return fmt.Errorf("upstream destination name cannot be a wildcard")
}
if u.LocalBindPort == 0 && !u.CentrallyConfigured {
return fmt.Errorf("upstream local bind port cannot be zero")

@ -1153,6 +1153,11 @@ func (s *NodeService) Validate() error {
"Proxy.DestinationServiceName must be non-empty for Connect proxy "+
"services"))
}
if s.Proxy.DestinationServiceName == WildcardSpecifier {
result = multierror.Append(result, fmt.Errorf(
"Proxy.DestinationServiceName must not be a wildcard for Connect proxy "+
"services"))
}
if s.Port == 0 {
result = multierror.Append(result, fmt.Errorf(
@ -1189,7 +1194,9 @@ func (s *NodeService) Validate() error {
}
addr = net.JoinHostPort(addr, fmt.Sprintf("%d", u.LocalBindPort))
if _, ok := bindAddrs[addr]; ok {
// Centrally configured upstreams will fail this check if there are multiple because they do not have an address/port.
// Only consider non-centrally configured upstreams in this check since those are the ones we create listeners for.
if _, ok := bindAddrs[addr]; ok && !u.CentrallyConfigured {
result = multierror.Append(result, fmt.Errorf(
"upstreams cannot contain duplicates by local bind address and port; %q is specified twice", addr))
continue

@ -648,6 +648,12 @@ func TestStructs_NodeService_ValidateConnectProxy(t *testing.T) {
"Proxy.DestinationServiceName must be",
},
{
"connect-proxy: wildcard Proxy.DestinationServiceName",
func(x *NodeService) { x.Proxy.DestinationServiceName = "*" },
"Proxy.DestinationServiceName must not be",
},
{
"connect-proxy: valid Proxy.DestinationServiceName",
func(x *NodeService) { x.Proxy.DestinationServiceName = "hello" },
@ -697,6 +703,28 @@ func TestStructs_NodeService_ValidateConnectProxy(t *testing.T) {
},
"upstream destination name cannot be empty",
},
{
"connect-proxy: upstream wildcard name",
func(x *NodeService) {
x.Proxy.Upstreams = Upstreams{{
DestinationType: UpstreamDestTypeService,
DestinationName: WildcardSpecifier,
LocalBindPort: 5000,
}}
},
"upstream destination name cannot be a wildcard",
},
{
"connect-proxy: upstream can have wildcard name when centrally configured",
func(x *NodeService) {
x.Proxy.Upstreams = Upstreams{{
DestinationType: UpstreamDestTypeService,
DestinationName: WildcardSpecifier,
CentrallyConfigured: true,
}}
},
"",
},
{
"connect-proxy: upstream empty bind port",
func(x *NodeService) {
@ -768,6 +796,24 @@ func TestStructs_NodeService_ValidateConnectProxy(t *testing.T) {
},
"upstreams cannot contain duplicates",
},
{
"connect-proxy: Centrally configured upstreams can have duplicate ip/port",
func(x *NodeService) {
x.Proxy.Upstreams = Upstreams{
{
DestinationType: UpstreamDestTypeService,
DestinationName: "foo",
CentrallyConfigured: true,
},
{
DestinationType: UpstreamDestTypeService,
DestinationName: "bar",
CentrallyConfigured: true,
},
}
},
"",
},
{
"connect-proxy: Upstreams duplicated by ip and port",
func(x *NodeService) {

@ -98,7 +98,7 @@ type ConnectConfiguration struct {
UpstreamConfigs map[string]UpstreamConfig `json:",omitempty" alias:"upstream_configs"`
// UpstreamDefaults contains default configuration for all upstreams of a given service
UpstreamDefaults UpstreamConfig `json:",omitempty" alias:"upstream_defaults"`
UpstreamDefaults *UpstreamConfig `json:",omitempty" alias:"upstream_defaults"`
}
type UpstreamConfig struct {

@ -393,7 +393,7 @@ func TestDecodeConfigEntry(t *testing.T) {
MeshGateway: MeshGatewayConfig{Mode: "remote"},
},
},
UpstreamDefaults: UpstreamConfig{
UpstreamDefaults: &UpstreamConfig{
EnvoyClusterJSON: "zip",
EnvoyListenerJSON: "zop",
Protocol: "http",

@ -638,7 +638,7 @@ func TestParseConfigEntry(t *testing.T) {
},
},
},
UpstreamDefaults: api.UpstreamConfig{
UpstreamDefaults: &api.UpstreamConfig{
EnvoyClusterJSON: "zip",
EnvoyListenerJSON: "zop",
Protocol: "http",

Loading…
Cancel
Save