Add per-upstream configuration to service-defaults

pull/9877/head
Freddy 4 years ago committed by GitHub
commit c664938bae
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

@ -0,0 +1,3 @@
```release-note:improvement
connect: Allow per-upstream configuration to be set in service-defaults. [experimental]
```

@ -329,31 +329,21 @@ func (c *ConfigEntry) ResolveServiceConfig(args *structs.ServiceConfigRequest, r
&reply.QueryMeta,
func(ws memdb.WatchSet, state *state.Store) error {
reply.Reset()
reply.MeshGateway.Mode = structs.MeshGatewayModeDefault
// Pass the WatchSet to both the service and proxy config lookups. If either is updated
// during the blocking query, this function will be rerun and these state store lookups
// will both be current.
index, serviceEntry, err := state.ConfigEntry(ws, structs.ServiceDefaults, args.Name, &args.EnterpriseMeta)
if err != nil {
return err
}
var serviceConf *structs.ServiceConfigEntry
var ok bool
if serviceEntry != nil {
serviceConf, ok = serviceEntry.(*structs.ServiceConfigEntry)
if !ok {
return fmt.Errorf("invalid service config type %T", serviceEntry)
}
}
// Use the default enterprise meta to look up the global proxy defaults. In the future we may allow per-namespace proxy-defaults
// but not yet.
// TODO(freddy) Refactor this into smaller set of state store functions
// Pass the WatchSet to both the service and proxy config lookups. If either is updated during the
// blocking query, this function will be rerun and these state store lookups will both be current.
// We use the default enterprise meta to look up the global proxy defaults because they are not namespaced.
_, proxyEntry, err := state.ConfigEntry(ws, structs.ProxyDefaults, structs.ProxyConfigGlobal, structs.DefaultEnterpriseMeta())
if err != nil {
return err
}
var proxyConf *structs.ProxyConfigEntry
var (
proxyConf *structs.ProxyConfigEntry
proxyConfGlobalProtocol string
ok bool
)
if proxyEntry != nil {
proxyConf, ok = proxyEntry.(*structs.ProxyConfigEntry)
if !ok {
@ -367,11 +357,29 @@ func (c *ConfigEntry) ResolveServiceConfig(args *structs.ServiceConfigRequest, r
reply.ProxyConfig = mapCopy.(map[string]interface{})
reply.MeshGateway = proxyConf.MeshGateway
reply.Expose = proxyConf.Expose
// Extract the global protocol from proxyConf for upstream configs.
rawProtocol := proxyConf.Config["protocol"]
if rawProtocol != nil {
proxyConfGlobalProtocol, ok = rawProtocol.(string)
if !ok {
return fmt.Errorf("invalid protocol type %T", rawProtocol)
}
}
}
index, serviceEntry, err := state.ConfigEntry(ws, structs.ServiceDefaults, args.Name, &args.EnterpriseMeta)
if err != nil {
return err
}
reply.Index = index
if serviceConf != nil {
var serviceConf *structs.ServiceConfigEntry
if serviceEntry != nil {
serviceConf, ok = serviceEntry.(*structs.ServiceConfigEntry)
if !ok {
return fmt.Errorf("invalid service config type %T", serviceEntry)
}
if serviceConf.Expose.Checks {
reply.Expose.Checks = true
}
@ -389,55 +397,107 @@ func (c *ConfigEntry) ResolveServiceConfig(args *structs.ServiceConfigRequest, r
}
}
// Extract the global protocol from proxyConf for upstream configs.
var proxyConfGlobalProtocol interface{}
if proxyConf != nil && proxyConf.Config != nil {
proxyConfGlobalProtocol = proxyConf.Config["protocol"]
}
// First collect all upstreams into a set of seen upstreams.
// Upstreams can come from:
// - Explicitly from proxy registrations, and therefore as an argument to this RPC endpoint
// - Implicitly from centralized upstream config in service-defaults
seenUpstreams := map[structs.ServiceID]struct{}{}
// map the legacy request structure using only service names
// to the new ServiceID type.
upstreamIDs := args.UpstreamIDs
legacyUpstreams := false
// Before Consul namespaces were released, the Upstreams provided to the endpoint did not contain the namespace.
// Because of this we attach the enterprise meta of the request, which will just be the default namespace.
if len(upstreamIDs) == 0 {
legacyUpstreams = true
upstreamIDs = make([]structs.ServiceID, 0)
for _, upstream := range args.Upstreams {
upstreamIDs = append(upstreamIDs, structs.NewServiceID(upstream, &args.EnterpriseMeta))
sid := structs.NewServiceID(upstream, &args.EnterpriseMeta)
upstreamIDs = append(upstreamIDs, sid)
}
}
// First store all upstreams that were provided in the request
for _, sid := range upstreamIDs {
if _, ok := seenUpstreams[sid]; !ok {
seenUpstreams[sid] = struct{}{}
}
}
// Then store upstreams inferred from service-defaults
if serviceConf != nil && serviceConf.Connect != nil {
for sid := range serviceConf.Connect.UpstreamConfigs {
seenUpstreams[structs.ServiceIDFromString(sid)] = struct{}{}
}
}
var (
upstreamDefaults *structs.UpstreamConfig
upstreamConfigs map[string]*structs.UpstreamConfig
)
if serviceConf != nil && serviceConf.Connect != nil {
if serviceConf.Connect.UpstreamDefaults != nil {
upstreamDefaults = serviceConf.Connect.UpstreamDefaults
}
if serviceConf.Connect.UpstreamConfigs != nil {
upstreamConfigs = serviceConf.Connect.UpstreamConfigs
}
}
// usConfigs stores the opaque config map for each upstream and is keyed on the upstream's ID.
usConfigs := make(map[structs.ServiceID]map[string]interface{})
for _, upstream := range upstreamIDs {
_, upstreamEntry, err := state.ConfigEntry(ws, structs.ServiceDefaults, upstream.ID, &upstream.EnterpriseMeta)
for upstream := range seenUpstreams {
resolvedCfg := make(map[string]interface{})
// The protocol of an upstream is resolved in this order:
// 1. Default protocol from proxy-defaults (how all services should be addressed)
// 2. Protocol for upstream service defined in its service-defaults (how the upstream wants to be addressed)
// 3. Protocol defined for the upstream in the service-defaults.(upstream_defaults|upstream_configs) of the downstream
// (how the downstream wants to address it)
protocol := proxyConfGlobalProtocol
_, upstreamSvcDefaults, err := state.ConfigEntry(ws, structs.ServiceDefaults, upstream.ID, &upstream.EnterpriseMeta)
if err != nil {
return err
}
var upstreamConf *structs.ServiceConfigEntry
var ok bool
if upstreamEntry != nil {
upstreamConf, ok = upstreamEntry.(*structs.ServiceConfigEntry)
if upstreamSvcDefaults != nil {
cfg, ok := upstreamSvcDefaults.(*structs.ServiceConfigEntry)
if !ok {
return fmt.Errorf("invalid service config type %T", upstreamEntry)
return fmt.Errorf("invalid service config type %T", upstreamSvcDefaults)
}
if cfg.Protocol != "" {
protocol = cfg.Protocol
}
}
if protocol != "" {
resolvedCfg["protocol"] = protocol
}
// Fallback to proxyConf global protocol.
protocol := proxyConfGlobalProtocol
if upstreamConf != nil && upstreamConf.Protocol != "" {
protocol = upstreamConf.Protocol
// Merge centralized defaults for all upstreams before configuration for specific upstreams
if upstreamDefaults != nil {
upstreamDefaults.MergeInto(resolvedCfg)
}
// Nothing to configure if a protocol hasn't been set.
if protocol == nil {
continue
// The MeshGateway value from the proxy registration overrides the one from upstream_defaults
// because it is specific to the proxy instance.
//
// The goal is to flatten the mesh gateway mode in this order:
// 0. Value from centralized upstream_defaults
// 1. Value from local proxy registration
// 2. Value from centralized upstream_configs
// 3. Value from local upstream definition. This last step is done in the client's service manager.
if !args.MeshGateway.IsZero() {
resolvedCfg["mesh_gateway"] = args.MeshGateway
}
usConfigs[upstream] = map[string]interface{}{
"protocol": protocol,
if upstreamConfigs[upstream.String()] != nil {
upstreamConfigs[upstream.String()].MergeInto(resolvedCfg)
}
if len(resolvedCfg) > 0 {
usConfigs[upstream] = resolvedCfg
}
}
@ -447,22 +507,21 @@ func (c *ConfigEntry) ResolveServiceConfig(args *structs.ServiceConfigRequest, r
}
if legacyUpstreams {
if reply.UpstreamConfigs == nil {
reply.UpstreamConfigs = make(map[string]map[string]interface{})
}
// For legacy upstreams we return a map that is only keyed on the string ID, since they precede namespaces
reply.UpstreamConfigs = make(map[string]map[string]interface{})
for us, conf := range usConfigs {
reply.UpstreamConfigs[us.ID] = conf
}
} else {
if reply.UpstreamIDConfigs == nil {
reply.UpstreamIDConfigs = make(structs.UpstreamConfigs, 0, len(usConfigs))
}
reply.UpstreamIDConfigs = make(structs.OpaqueUpstreamConfigs, 0, len(usConfigs))
for us, conf := range usConfigs {
reply.UpstreamIDConfigs = append(reply.UpstreamIDConfigs, structs.UpstreamConfig{Upstream: us, Config: conf})
reply.UpstreamIDConfigs = append(reply.UpstreamIDConfigs,
structs.OpaqueUpstreamConfig{Upstream: us, Config: conf})
}
}
return nil
})
}

@ -2,6 +2,7 @@ package consul
import (
"os"
"sort"
"testing"
"time"
@ -892,6 +893,258 @@ func TestConfigEntry_ResolveServiceConfig(t *testing.T) {
require.Equal(map[string]interface{}{"foo": 1}, proxyConf.Config)
}
func TestConfigEntry_ResolveServiceConfig_Upstreams(t *testing.T) {
if testing.Short() {
t.Skip("too slow for testing.Short")
}
t.Parallel()
tt := []struct {
name string
entries []structs.ConfigEntry
request structs.ServiceConfigRequest
proxyCfg structs.ConnectProxyConfig
expect structs.ServiceConfigResponse
}{
{
name: "upstream config entries from Upstreams and service-defaults",
entries: []structs.ConfigEntry{
&structs.ProxyConfigEntry{
Kind: structs.ProxyDefaults,
Name: structs.ProxyConfigGlobal,
Config: map[string]interface{}{
"protocol": "grpc",
},
},
&structs.ServiceConfigEntry{
Kind: structs.ServiceDefaults,
Name: "foo",
Connect: &structs.ConnectConfiguration{
UpstreamConfigs: map[string]*structs.UpstreamConfig{
"zip": {
Protocol: "http",
},
},
},
},
},
request: structs.ServiceConfigRequest{
Name: "foo",
Datacenter: "dc1",
Upstreams: []string{"zap"},
},
expect: structs.ServiceConfigResponse{
ProxyConfig: map[string]interface{}{
"protocol": "grpc",
},
UpstreamConfigs: map[string]map[string]interface{}{
"zip": {
"protocol": "http",
},
"zap": {
"protocol": "grpc",
},
},
},
},
{
name: "upstream config entries from UpstreamIDs and service-defaults",
entries: []structs.ConfigEntry{
&structs.ProxyConfigEntry{
Kind: structs.ProxyDefaults,
Name: structs.ProxyConfigGlobal,
Config: map[string]interface{}{
"protocol": "grpc",
},
},
&structs.ServiceConfigEntry{
Kind: structs.ServiceDefaults,
Name: "foo",
Connect: &structs.ConnectConfiguration{
UpstreamConfigs: map[string]*structs.UpstreamConfig{
"zip": {
Protocol: "http",
},
},
},
},
},
request: structs.ServiceConfigRequest{
Name: "foo",
Datacenter: "dc1",
UpstreamIDs: []structs.ServiceID{{ID: "zap"}},
},
expect: structs.ServiceConfigResponse{
ProxyConfig: map[string]interface{}{
"protocol": "grpc",
},
UpstreamIDConfigs: structs.OpaqueUpstreamConfigs{
{
Upstream: structs.ServiceID{
ID: "zap",
EnterpriseMeta: *structs.DefaultEnterpriseMeta(),
},
Config: map[string]interface{}{
"protocol": "grpc",
},
},
{
Upstream: structs.ServiceID{
ID: "zip",
EnterpriseMeta: *structs.DefaultEnterpriseMeta(),
},
Config: map[string]interface{}{
"protocol": "http",
},
},
},
},
},
{
name: "proxy registration overrides upstream_defaults",
entries: []structs.ConfigEntry{
&structs.ServiceConfigEntry{
Kind: structs.ServiceDefaults,
Name: "foo",
Connect: &structs.ConnectConfiguration{
UpstreamDefaults: &structs.UpstreamConfig{
MeshGateway: structs.MeshGatewayConfig{Mode: structs.MeshGatewayModeRemote},
},
},
},
},
request: structs.ServiceConfigRequest{
Name: "foo",
Datacenter: "dc1",
MeshGateway: structs.MeshGatewayConfig{
Mode: structs.MeshGatewayModeNone,
},
UpstreamIDs: []structs.ServiceID{
{ID: "zap"},
},
},
expect: structs.ServiceConfigResponse{
UpstreamIDConfigs: structs.OpaqueUpstreamConfigs{
{
Upstream: structs.ServiceID{
ID: "zap",
EnterpriseMeta: *structs.DefaultEnterpriseMeta(),
},
Config: map[string]interface{}{
"mesh_gateway": map[string]interface{}{
"Mode": "none",
},
},
},
},
},
},
{
name: "upstream_configs overrides all",
entries: []structs.ConfigEntry{
&structs.ProxyConfigEntry{
Kind: structs.ProxyDefaults,
Name: structs.ProxyConfigGlobal,
Config: map[string]interface{}{
"protocol": "udp",
},
},
&structs.ServiceConfigEntry{
Kind: structs.ServiceDefaults,
Name: "foo",
Protocol: "tcp",
},
&structs.ServiceConfigEntry{
Kind: structs.ServiceDefaults,
Name: "foo",
Connect: &structs.ConnectConfiguration{
UpstreamDefaults: &structs.UpstreamConfig{
Protocol: "http",
MeshGateway: structs.MeshGatewayConfig{Mode: structs.MeshGatewayModeRemote},
PassiveHealthCheck: &structs.PassiveHealthCheck{
Interval: 10,
MaxFailures: 2,
},
},
UpstreamConfigs: map[string]*structs.UpstreamConfig{
"zap": {
Protocol: "grpc",
MeshGateway: structs.MeshGatewayConfig{Mode: structs.MeshGatewayModeLocal},
},
},
},
},
},
request: structs.ServiceConfigRequest{
Name: "foo",
Datacenter: "dc1",
MeshGateway: structs.MeshGatewayConfig{
Mode: structs.MeshGatewayModeNone,
},
UpstreamIDs: []structs.ServiceID{
{ID: "zap"},
},
},
expect: structs.ServiceConfigResponse{
ProxyConfig: map[string]interface{}{
"protocol": "udp",
},
UpstreamIDConfigs: structs.OpaqueUpstreamConfigs{
{
Upstream: structs.ServiceID{
ID: "zap",
EnterpriseMeta: *structs.DefaultEnterpriseMeta(),
},
Config: map[string]interface{}{
"passive_health_check": map[string]interface{}{
"Interval": int64(10),
"MaxFailures": int64(2),
},
"mesh_gateway": map[string]interface{}{
"Mode": "local",
},
"protocol": "grpc",
},
},
},
},
},
}
for _, tc := range tt {
t.Run(tc.name, func(t *testing.T) {
dir1, s1 := testServer(t)
defer os.RemoveAll(dir1)
defer s1.Shutdown()
codec := rpcClient(t, s1)
defer codec.Close()
state := s1.fsm.State()
// Boostrap the config entries
idx := uint64(1)
for _, conf := range tc.entries {
require.NoError(t, state.EnsureConfigEntry(idx, conf))
idx++
}
var out structs.ServiceConfigResponse
require.NoError(t, msgpackrpc.CallWithCodec(codec, "ConfigEntry.ResolveServiceConfig", &tc.request, &out))
// Don't know what this is deterministically, so we grab it from the response
tc.expect.QueryMeta = out.QueryMeta
// Order of this slice is also not deterministic since it's populated from a map
sort.SliceStable(out.UpstreamIDConfigs, func(i, j int) bool {
return out.UpstreamIDConfigs[i].Upstream.String() < out.UpstreamIDConfigs[j].Upstream.String()
})
require.Equal(t, tc.expect, out)
})
}
}
func TestConfigEntry_ResolveServiceConfig_Blocking(t *testing.T) {
if testing.Short() {
t.Skip("too slow for testing.Short")

@ -309,8 +309,11 @@ func (w *serviceConfigWatch) handleUpdate(ctx context.Context, event cache.Updat
}
func makeConfigRequest(bd BaseDeps, addReq AddServiceRequest) *structs.ServiceConfigRequest {
ns := addReq.Service
name := ns.Service
var (
ns = addReq.Service
name = ns.Service
)
var upstreams []structs.ServiceID
// Note that only sidecar proxies should even make it here for now although
@ -335,6 +338,7 @@ func makeConfigRequest(bd BaseDeps, addReq AddServiceRequest) *structs.ServiceCo
Name: name,
Datacenter: bd.RuntimeConfig.Datacenter,
QueryOptions: structs.QueryOptions{Token: addReq.token},
MeshGateway: ns.Proxy.MeshGateway,
UpstreamIDs: upstreams,
EnterpriseMeta: ns.EnterpriseMeta,
}
@ -365,7 +369,6 @@ func mergeServiceConfig(defaults *structs.ServiceConfigResponse, service *struct
if err := mergo.Merge(&ns.Proxy.Config, defaults.ProxyConfig); err != nil {
return nil, err
}
if err := mergo.Merge(&ns.Proxy.Expose, defaults.Expose); err != nil {
return nil, err
}
@ -382,16 +385,27 @@ func mergeServiceConfig(defaults *structs.ServiceConfigResponse, service *struct
continue
}
// default the upstreams gateway mode if it didn't specify one
if us.MeshGateway.Mode == structs.MeshGatewayModeDefault {
us.MeshGateway.Mode = ns.Proxy.MeshGateway.Mode
}
usCfg, ok := defaults.UpstreamIDConfigs.GetUpstreamConfig(us.DestinationID())
if !ok {
// No config defaults to merge
continue
}
// MeshGateway mode is fetched separately since it is a first class field and not read from us.Config
parsed, err := structs.ParseUpstreamConfig(usCfg)
if err != nil {
return nil, fmt.Errorf("failed to parse upstream config map for %s: %v", us.Identifier(), err)
}
// The local upstream config mode has the highest precedence, so only overwrite when it's set to the default
if us.MeshGateway.Mode == structs.MeshGatewayModeDefault {
us.MeshGateway.Mode = parsed.MeshGateway.Mode
}
// Delete the mesh gateway key since this is the only place it is read from an opaque map.
delete(usCfg, "mesh_gateway")
// Merge in everything else that is read from the map
if err := mergo.Merge(&us.Config, usCfg); err != nil {
return nil, err
}

@ -8,11 +8,11 @@ import (
"path/filepath"
"testing"
"github.com/stretchr/testify/require"
"github.com/hashicorp/consul/agent/structs"
"github.com/hashicorp/consul/sdk/testutil/retry"
"github.com/hashicorp/consul/testrpc"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
func TestServiceManager_RegisterService(t *testing.T) {
@ -418,8 +418,8 @@ func TestServiceManager_PersistService_API(t *testing.T) {
"foo": 1,
"protocol": "http",
},
UpstreamIDConfigs: structs.UpstreamConfigs{
structs.UpstreamConfig{
UpstreamIDConfigs: structs.OpaqueUpstreamConfigs{
structs.OpaqueUpstreamConfig{
Upstream: structs.NewServiceID("redis", nil),
Config: map[string]interface{}{
"protocol": "tcp",
@ -459,8 +459,8 @@ func TestServiceManager_PersistService_API(t *testing.T) {
"foo": 1,
"protocol": "http",
},
UpstreamIDConfigs: structs.UpstreamConfigs{
structs.UpstreamConfig{
UpstreamIDConfigs: structs.OpaqueUpstreamConfigs{
structs.OpaqueUpstreamConfig{
Upstream: structs.NewServiceID("redis", nil),
Config: map[string]interface{}{
"protocol": "tcp",
@ -634,8 +634,8 @@ func TestServiceManager_PersistService_ConfigFiles(t *testing.T) {
"foo": 1,
"protocol": "http",
},
UpstreamIDConfigs: structs.UpstreamConfigs{
structs.UpstreamConfig{
UpstreamIDConfigs: structs.OpaqueUpstreamConfigs{
structs.OpaqueUpstreamConfig{
Upstream: structs.NewServiceID("redis", nil),
Config: map[string]interface{}{
"protocol": "tcp",
@ -848,3 +848,205 @@ func convertToMap(v interface{}) (map[string]interface{}, error) {
return raw, nil
}
func Test_mergeServiceConfig_UpstreamOverrides(t *testing.T) {
type args struct {
defaults *structs.ServiceConfigResponse
service *structs.NodeService
}
tests := []struct {
name string
args args
want *structs.NodeService
}{
{
name: "new config fields",
args: args{
defaults: &structs.ServiceConfigResponse{
UpstreamIDConfigs: structs.OpaqueUpstreamConfigs{
{
Upstream: structs.ServiceID{
ID: "zap",
EnterpriseMeta: *structs.DefaultEnterpriseMeta(),
},
Config: map[string]interface{}{
"passive_health_check": map[string]interface{}{
"Interval": int64(10),
"MaxFailures": int64(2),
},
"mesh_gateway": map[string]interface{}{
"Mode": "local",
},
"protocol": "grpc",
},
},
},
},
service: &structs.NodeService{
ID: "foo-proxy",
Service: "foo-proxy",
Proxy: structs.ConnectProxyConfig{
DestinationServiceName: "foo",
DestinationServiceID: "foo",
Upstreams: structs.Upstreams{
structs.Upstream{
DestinationNamespace: "default",
DestinationName: "zap",
},
},
},
},
},
want: &structs.NodeService{
ID: "foo-proxy",
Service: "foo-proxy",
Proxy: structs.ConnectProxyConfig{
DestinationServiceName: "foo",
DestinationServiceID: "foo",
Upstreams: structs.Upstreams{
structs.Upstream{
DestinationNamespace: "default",
DestinationName: "zap",
Config: map[string]interface{}{
"passive_health_check": map[string]interface{}{
"Interval": int64(10),
"MaxFailures": int64(2),
},
"protocol": "grpc",
},
MeshGateway: structs.MeshGatewayConfig{
Mode: structs.MeshGatewayModeLocal,
},
},
},
},
},
},
{
name: "upstream mode from remote defaults overrides local default",
args: args{
defaults: &structs.ServiceConfigResponse{
UpstreamIDConfigs: structs.OpaqueUpstreamConfigs{
{
Upstream: structs.ServiceID{
ID: "zap",
EnterpriseMeta: *structs.DefaultEnterpriseMeta(),
},
Config: map[string]interface{}{
"mesh_gateway": map[string]interface{}{
"Mode": "local",
},
},
},
},
},
service: &structs.NodeService{
ID: "foo-proxy",
Service: "foo-proxy",
Proxy: structs.ConnectProxyConfig{
DestinationServiceName: "foo",
DestinationServiceID: "foo",
MeshGateway: structs.MeshGatewayConfig{
Mode: structs.MeshGatewayModeRemote,
},
Upstreams: structs.Upstreams{
structs.Upstream{
DestinationNamespace: "default",
DestinationName: "zap",
},
},
},
},
},
want: &structs.NodeService{
ID: "foo-proxy",
Service: "foo-proxy",
Proxy: structs.ConnectProxyConfig{
DestinationServiceName: "foo",
DestinationServiceID: "foo",
MeshGateway: structs.MeshGatewayConfig{
Mode: structs.MeshGatewayModeRemote,
},
Upstreams: structs.Upstreams{
structs.Upstream{
DestinationNamespace: "default",
DestinationName: "zap",
Config: map[string]interface{}{},
MeshGateway: structs.MeshGatewayConfig{
Mode: structs.MeshGatewayModeLocal,
},
},
},
},
},
},
{
name: "mode in local upstream config overrides all",
args: args{
defaults: &structs.ServiceConfigResponse{
UpstreamIDConfigs: structs.OpaqueUpstreamConfigs{
{
Upstream: structs.ServiceID{
ID: "zap",
EnterpriseMeta: *structs.DefaultEnterpriseMeta(),
},
Config: map[string]interface{}{
"mesh_gateway": map[string]interface{}{
"Mode": "local",
},
},
},
},
},
service: &structs.NodeService{
ID: "foo-proxy",
Service: "foo-proxy",
Proxy: structs.ConnectProxyConfig{
DestinationServiceName: "foo",
DestinationServiceID: "foo",
MeshGateway: structs.MeshGatewayConfig{
Mode: structs.MeshGatewayModeRemote,
},
Upstreams: structs.Upstreams{
structs.Upstream{
DestinationNamespace: "default",
DestinationName: "zap",
MeshGateway: structs.MeshGatewayConfig{
Mode: structs.MeshGatewayModeNone,
},
},
},
},
},
},
want: &structs.NodeService{
ID: "foo-proxy",
Service: "foo-proxy",
Proxy: structs.ConnectProxyConfig{
DestinationServiceName: "foo",
DestinationServiceID: "foo",
MeshGateway: structs.MeshGatewayConfig{
Mode: structs.MeshGatewayModeRemote,
},
Upstreams: structs.Upstreams{
structs.Upstream{
DestinationNamespace: "default",
DestinationName: "zap",
Config: map[string]interface{}{},
MeshGateway: structs.MeshGatewayConfig{
Mode: structs.MeshGatewayModeNone,
},
},
},
},
},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
got, err := mergeServiceConfig(tt.args.defaults, tt.args.service)
require.NoError(t, err)
assert.Equal(t, tt.want, got)
})
}
}

@ -87,11 +87,7 @@ type ServiceConfigEntry struct {
ExternalSNI string `json:",omitempty" alias:"external_sni"`
// TODO(banks): enable this once we have upstreams supported too. Enabling
// sidecars actually makes no sense and adds complications when you don't
// allow upstreams to be specified centrally too.
//
// Connect ConnectConfiguration
Connect *ConnectConfiguration `json:",omitempty"`
Meta map[string]string `json:",omitempty"`
EnterpriseMeta `hcl:",squash" mapstructure:",squash"`
@ -131,13 +127,23 @@ func (e *ServiceConfigEntry) Normalize() error {
e.Kind = ServiceDefaults
e.Protocol = strings.ToLower(e.Protocol)
e.Connect.Normalize()
e.EnterpriseMeta.Normalize()
return nil
}
func (e *ServiceConfigEntry) Validate() error {
return validateConfigEntryMeta(e.Meta)
validationErr := validateConfigEntryMeta(e.Meta)
if e.Connect != nil {
err := e.Connect.Validate()
if err != nil {
validationErr = multierror.Append(validationErr, err)
}
}
return validationErr
}
func (e *ServiceConfigEntry) CanRead(authz acl.Authorizer) bool {
@ -169,7 +175,38 @@ func (e *ServiceConfigEntry) GetEnterpriseMeta() *EnterpriseMeta {
}
type ConnectConfiguration struct {
SidecarProxy bool
// UpstreamConfigs is a map of <namespace/>service to per-upstream configuration
UpstreamConfigs map[string]*UpstreamConfig `json:",omitempty" alias:"upstream_configs"`
// UpstreamDefaults contains default configuration for all upstreams of a given service
UpstreamDefaults *UpstreamConfig `json:",omitempty" alias:"upstream_defaults"`
}
func (cfg *ConnectConfiguration) Normalize() {
if cfg == nil {
return
}
for _, v := range cfg.UpstreamConfigs {
v.Normalize()
}
cfg.UpstreamDefaults.Normalize()
}
func (cfg ConnectConfiguration) Validate() error {
var validationErr error
for k, v := range cfg.UpstreamConfigs {
if err := v.Validate(); err != nil {
validationErr = multierror.Append(validationErr, fmt.Errorf("error in upstream config for %s: %v", k, err))
}
}
if err := cfg.UpstreamDefaults.Validate(); err != nil {
validationErr = multierror.Append(validationErr, fmt.Errorf("error in upstream defaults %v", err))
}
return validationErr
}
// ProxyConfigEntry is the top-level struct for global proxy configuration defaults.
@ -542,12 +579,17 @@ func (r *ConfigEntryListAllRequest) RequestDatacenter() string {
type ServiceConfigRequest struct {
Name string
Datacenter string
// MeshGateway contains the mesh gateway configuration from the requesting proxy's registration
MeshGateway MeshGatewayConfig
UpstreamIDs []ServiceID
// DEPRECATED
// Upstreams is a list of upstream service names to use for resolving the service config
// UpstreamIDs should be used instead which can encode more than just the name to
// uniquely identify a service.
Upstreams []string
UpstreamIDs []ServiceID
Upstreams []string
EnterpriseMeta `hcl:",squash" mapstructure:",squash"`
QueryOptions
@ -592,13 +634,196 @@ func (r *ServiceConfigRequest) CacheInfo() cache.RequestInfo {
}
type UpstreamConfig struct {
// EnvoyListenerJSON is a complete override ("escape hatch") for the upstream's
// listener.
//
// Note: This escape hatch is NOT compatible with the discovery chain and
// will be ignored if a discovery chain is active.
EnvoyListenerJSON string `json:",omitempty" alias:"envoy_listener_json"`
// EnvoyClusterJSON is a complete override ("escape hatch") for the upstream's
// cluster. The Connect client TLS certificate and context will be injected
// overriding any TLS settings present.
//
// Note: This escape hatch is NOT compatible with the discovery chain and
// will be ignored if a discovery chain is active.
EnvoyClusterJSON string `json:",omitempty" alias:"envoy_cluster_json"`
// Protocol describes the upstream's service protocol. Valid values are "tcp",
// "http" and "grpc". Anything else is treated as tcp. The enables protocol
// aware features like per-request metrics and connection pooling, tracing,
// routing etc.
Protocol string `json:",omitempty"`
// ConnectTimeoutMs is the number of milliseconds to timeout making a new
// connection to this upstream. Defaults to 5000 (5 seconds) if not set.
ConnectTimeoutMs int `json:",omitempty" alias:"connect_timeout_ms"`
// Limits are the set of limits that are applied to the proxy for a specific upstream of a
// service instance.
Limits *UpstreamLimits `json:",omitempty"`
// PassiveHealthCheck configuration determines how upstream proxy instances will
// be monitored for removal from the load balancing pool.
PassiveHealthCheck *PassiveHealthCheck `json:",omitempty" alias:"passive_health_check"`
// MeshGatewayConfig controls how Mesh Gateways are configured and used
MeshGateway MeshGatewayConfig `json:",omitempty" alias:"mesh_gateway" `
}
func (cfg UpstreamConfig) MergeInto(dst map[string]interface{}) {
// Avoid storing empty values in the map, since these can act as overrides
if cfg.EnvoyListenerJSON != "" {
dst["envoy_listener_json"] = cfg.EnvoyListenerJSON
}
if cfg.EnvoyClusterJSON != "" {
dst["envoy_cluster_json"] = cfg.EnvoyClusterJSON
}
if cfg.Protocol != "" {
dst["protocol"] = cfg.Protocol
}
if cfg.ConnectTimeoutMs != 0 {
dst["connect_timeout_ms"] = cfg.ConnectTimeoutMs
}
if !cfg.MeshGateway.IsZero() {
dst["mesh_gateway"] = cfg.MeshGateway
}
if cfg.Limits != nil {
dst["limits"] = cfg.Limits
}
if cfg.PassiveHealthCheck != nil {
dst["passive_health_check"] = cfg.PassiveHealthCheck
}
}
func (cfg *UpstreamConfig) Normalize() {
cfg.Protocol = strings.ToLower(cfg.Protocol)
if cfg.ConnectTimeoutMs < 1 {
cfg.ConnectTimeoutMs = 5000
}
}
func (cfg UpstreamConfig) Validate() error {
var validationErr error
if cfg.PassiveHealthCheck != nil {
err := cfg.PassiveHealthCheck.Validate()
if err != nil {
validationErr = multierror.Append(validationErr, err)
}
}
if cfg.Limits != nil {
err := cfg.Limits.Validate()
if err != nil {
validationErr = multierror.Append(validationErr, err)
}
}
return validationErr
}
func ParseUpstreamConfigNoDefaults(m map[string]interface{}) (UpstreamConfig, error) {
var cfg UpstreamConfig
config := &mapstructure.DecoderConfig{
DecodeHook: mapstructure.ComposeDecodeHookFunc(
decode.HookWeakDecodeFromSlice,
decode.HookTranslateKeys,
mapstructure.StringToTimeDurationHookFunc(),
),
Result: &cfg,
WeaklyTypedInput: true,
}
decoder, err := mapstructure.NewDecoder(config)
if err != nil {
return cfg, err
}
err = decoder.Decode(m)
return cfg, err
}
// ParseUpstreamConfig returns the UpstreamConfig parsed from an opaque map.
// If an error occurs during parsing it is returned along with the default
// config this allows caller to choose whether and how to report the error.
func ParseUpstreamConfig(m map[string]interface{}) (UpstreamConfig, error) {
cfg, err := ParseUpstreamConfigNoDefaults(m)
// Set defaults (even if error is returned)
cfg.Normalize()
return cfg, err
}
type PassiveHealthCheck struct {
// Interval between health check analysis sweeps. Each sweep may remove
// hosts or return hosts to the pool.
Interval time.Duration `json:",omitempty"`
// MaxFailures is the count of consecutive failures that results in a host
// being removed from the pool.
MaxFailures uint32 `json:",omitempty" alias:"max_failures"`
}
func (chk *PassiveHealthCheck) IsZero() bool {
zeroVal := PassiveHealthCheck{}
return *chk == zeroVal
}
func (chk PassiveHealthCheck) Validate() error {
if chk.Interval <= 0*time.Second {
return fmt.Errorf("passive health check interval must be greater than 0s")
}
return nil
}
// UpstreamLimits describes the limits that are associated with a specific
// upstream of a service instance.
type UpstreamLimits struct {
// MaxConnections is the maximum number of connections the local proxy can
// make to the upstream service.
MaxConnections *int `json:",omitempty" alias:"max_connections"`
// MaxPendingRequests is the maximum number of requests that will be queued
// waiting for an available connection. This is mostly applicable to HTTP/1.1
// clusters since all HTTP/2 requests are streamed over a single
// connection.
MaxPendingRequests *int `json:",omitempty" alias:"max_pending_requests"`
// MaxConcurrentRequests is the maximum number of in-flight requests that will be allowed
// to the upstream cluster at a point in time. This is mostly applicable to HTTP/2
// clusters since all HTTP/1.1 requests are limited by MaxConnections.
MaxConcurrentRequests *int `json:",omitempty" alias:"max_concurrent_requests"`
}
func (ul *UpstreamLimits) IsZero() bool {
zeroVal := UpstreamLimits{}
return *ul == zeroVal
}
func (ul UpstreamLimits) Validate() error {
if ul.MaxConnections != nil && *ul.MaxConnections <= 0 {
return fmt.Errorf("max connections must be at least 0")
}
if ul.MaxPendingRequests != nil && *ul.MaxPendingRequests <= 0 {
return fmt.Errorf("max pending requests must be at least 0")
}
if ul.MaxConcurrentRequests != nil && *ul.MaxConcurrentRequests <= 0 {
return fmt.Errorf("max concurrent requests must be at least 0")
}
return nil
}
type OpaqueUpstreamConfig struct {
Upstream ServiceID
Config map[string]interface{}
}
type UpstreamConfigs []UpstreamConfig
type OpaqueUpstreamConfigs []OpaqueUpstreamConfig
func (configs UpstreamConfigs) GetUpstreamConfig(sid ServiceID) (config map[string]interface{}, found bool) {
func (configs OpaqueUpstreamConfigs) GetUpstreamConfig(sid ServiceID) (config map[string]interface{}, found bool) {
for _, usconf := range configs {
if usconf.Upstream.Matches(sid) {
return usconf.Config, true
@ -611,7 +836,7 @@ func (configs UpstreamConfigs) GetUpstreamConfig(sid ServiceID) (config map[stri
type ServiceConfigResponse struct {
ProxyConfig map[string]interface{}
UpstreamConfigs map[string]map[string]interface{}
UpstreamIDConfigs UpstreamConfigs
UpstreamIDConfigs OpaqueUpstreamConfigs
MeshGateway MeshGatewayConfig `json:",omitempty"`
Expose ExposeConfig `json:",omitempty"`
QueryMeta

@ -8,6 +8,7 @@ import (
"github.com/hashicorp/go-msgpack/codec"
"github.com/hashicorp/hcl"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
@ -112,6 +113,33 @@ func TestDecodeConfigEntry(t *testing.T) {
mesh_gateway {
mode = "remote"
}
connect {
upstream_configs {
redis {
passive_health_check {
interval = "2s"
max_failures = 3
}
}
"finance/billing" {
mesh_gateway {
mode = "remote"
}
}
}
upstream_defaults {
connect_timeout_ms = 5
protocol = "http"
envoy_listener_json = "foo"
envoy_cluster_json = "bar"
limits {
max_connections = 3
max_pending_requests = 4
max_concurrent_requests = 5
}
}
}
`,
camel: `
Kind = "service-defaults"
@ -125,6 +153,33 @@ func TestDecodeConfigEntry(t *testing.T) {
MeshGateway {
Mode = "remote"
}
Connect {
UpstreamConfigs {
"redis" {
PassiveHealthCheck {
MaxFailures = 3
Interval = "2s"
}
}
"finance/billing" {
MeshGateway {
Mode = "remote"
}
}
}
UpstreamDefaults {
EnvoyListenerJSON = "foo"
EnvoyClusterJSON = "bar"
ConnectTimeoutMs = 5
Protocol = "http"
Limits {
MaxConnections = 3
MaxPendingRequests = 4
MaxConcurrentRequests = 5
}
}
}
`,
expect: &ServiceConfigEntry{
Kind: "service-defaults",
@ -138,6 +193,30 @@ func TestDecodeConfigEntry(t *testing.T) {
MeshGateway: MeshGatewayConfig{
Mode: MeshGatewayModeRemote,
},
Connect: &ConnectConfiguration{
UpstreamConfigs: map[string]*UpstreamConfig{
"redis": {
PassiveHealthCheck: &PassiveHealthCheck{
MaxFailures: 3,
Interval: 2 * time.Second,
},
},
"finance/billing": {
MeshGateway: MeshGatewayConfig{Mode: MeshGatewayModeRemote},
},
},
UpstreamDefaults: &UpstreamConfig{
EnvoyListenerJSON: "foo",
EnvoyClusterJSON: "bar",
ConnectTimeoutMs: 5,
Protocol: "http",
Limits: &UpstreamLimits{
MaxConnections: intPointer(3),
MaxPendingRequests: intPointer(4),
MaxConcurrentRequests: intPointer(5),
},
},
},
},
},
{
@ -1330,7 +1409,485 @@ func TestConfigEntryResponseMarshalling(t *testing.T) {
}
}
func TestPassiveHealthCheck_Validate(t *testing.T) {
tt := []struct {
name string
input PassiveHealthCheck
wantErr bool
wantMsg string
}{
{
name: "valid-interval",
input: PassiveHealthCheck{Interval: 2 * time.Second},
wantErr: false,
},
{
name: "negative-interval",
input: PassiveHealthCheck{Interval: -1 * time.Second},
wantErr: true,
wantMsg: "greater than 0s",
},
{
name: "zero-interval",
input: PassiveHealthCheck{Interval: 0 * time.Second},
wantErr: true,
wantMsg: "greater than 0s",
},
}
for _, tc := range tt {
t.Run(tc.name, func(t *testing.T) {
err := tc.input.Validate()
if err == nil {
require.False(t, tc.wantErr)
return
}
require.Contains(t, err.Error(), tc.wantMsg)
})
}
}
func TestUpstreamLimits_Validate(t *testing.T) {
tt := []struct {
name string
input UpstreamLimits
wantErr bool
wantMsg string
}{
{
name: "valid-max-conns",
input: UpstreamLimits{MaxConnections: intPointer(1)},
wantErr: false,
},
{
name: "zero-max-conns",
input: UpstreamLimits{MaxConnections: intPointer(0)},
wantErr: true,
wantMsg: "at least 0",
},
{
name: "negative-max-conns",
input: UpstreamLimits{MaxConnections: intPointer(-1)},
wantErr: true,
wantMsg: "at least 0",
},
{
name: "valid-max-concurrent",
input: UpstreamLimits{MaxConcurrentRequests: intPointer(1)},
wantErr: false,
},
{
name: "zero-max-concurrent",
input: UpstreamLimits{MaxConcurrentRequests: intPointer(0)},
wantErr: true,
wantMsg: "at least 0",
},
{
name: "negative-max-concurrent",
input: UpstreamLimits{MaxConcurrentRequests: intPointer(-1)},
wantErr: true,
wantMsg: "at least 0",
},
{
name: "valid-max-pending",
input: UpstreamLimits{MaxPendingRequests: intPointer(1)},
wantErr: false,
},
{
name: "zero-max-pending",
input: UpstreamLimits{MaxPendingRequests: intPointer(0)},
wantErr: true,
wantMsg: "at least 0",
},
{
name: "negative-max-pending",
input: UpstreamLimits{MaxPendingRequests: intPointer(-1)},
wantErr: true,
wantMsg: "at least 0",
},
}
for _, tc := range tt {
t.Run(tc.name, func(t *testing.T) {
err := tc.input.Validate()
if err == nil {
require.False(t, tc.wantErr)
return
}
require.Contains(t, err.Error(), tc.wantMsg)
})
}
}
func TestServiceConfigEntry_Normalize(t *testing.T) {
tt := []struct {
name string
input ServiceConfigEntry
expect ServiceConfigEntry
}{
{
name: "fill-in-kind",
input: ServiceConfigEntry{
Name: "web",
},
expect: ServiceConfigEntry{
Kind: ServiceDefaults,
Name: "web",
},
},
{
name: "lowercase-protocol",
input: ServiceConfigEntry{
Kind: ServiceDefaults,
Name: "web",
Protocol: "PrOtoCoL",
},
expect: ServiceConfigEntry{
Kind: ServiceDefaults,
Name: "web",
Protocol: "protocol",
},
},
{
name: "connect-kitchen-sink",
input: ServiceConfigEntry{
Kind: ServiceDefaults,
Name: "web",
Connect: &ConnectConfiguration{
UpstreamConfigs: map[string]*UpstreamConfig{
"redis": {
Protocol: "TcP",
},
"memcached": {
ConnectTimeoutMs: -1,
},
},
UpstreamDefaults: &UpstreamConfig{ConnectTimeoutMs: -20},
},
},
expect: ServiceConfigEntry{
Kind: ServiceDefaults,
Name: "web",
Connect: &ConnectConfiguration{
UpstreamConfigs: map[string]*UpstreamConfig{
"redis": {
Protocol: "tcp",
ConnectTimeoutMs: 5000,
},
"memcached": {
ConnectTimeoutMs: 5000,
},
},
UpstreamDefaults: &UpstreamConfig{
ConnectTimeoutMs: 5000,
},
},
},
},
}
for _, tc := range tt {
t.Run(tc.name, func(t *testing.T) {
err := tc.input.Normalize()
require.NoError(t, err)
require.Equal(t, tc.expect, tc.input)
})
}
}
func TestUpstreamConfig_MergeInto(t *testing.T) {
tt := []struct {
name string
source UpstreamConfig
destination map[string]interface{}
want map[string]interface{}
}{
{
name: "kitchen sink",
source: UpstreamConfig{
EnvoyListenerJSON: "foo",
EnvoyClusterJSON: "bar",
ConnectTimeoutMs: 5,
Protocol: "http",
Limits: &UpstreamLimits{
MaxConnections: intPointer(3),
MaxPendingRequests: intPointer(4),
MaxConcurrentRequests: intPointer(5),
},
PassiveHealthCheck: &PassiveHealthCheck{
MaxFailures: 3,
Interval: 2 * time.Second,
},
MeshGateway: MeshGatewayConfig{Mode: MeshGatewayModeRemote},
},
destination: make(map[string]interface{}),
want: map[string]interface{}{
"envoy_listener_json": "foo",
"envoy_cluster_json": "bar",
"connect_timeout_ms": 5,
"protocol": "http",
"limits": &UpstreamLimits{
MaxConnections: intPointer(3),
MaxPendingRequests: intPointer(4),
MaxConcurrentRequests: intPointer(5),
},
"passive_health_check": &PassiveHealthCheck{
MaxFailures: 3,
Interval: 2 * time.Second,
},
"mesh_gateway": MeshGatewayConfig{Mode: MeshGatewayModeRemote},
},
},
{
name: "kitchen sink override of destination",
source: UpstreamConfig{
EnvoyListenerJSON: "foo",
EnvoyClusterJSON: "bar",
ConnectTimeoutMs: 5,
Protocol: "http",
Limits: &UpstreamLimits{
MaxConnections: intPointer(3),
MaxPendingRequests: intPointer(4),
MaxConcurrentRequests: intPointer(5),
},
PassiveHealthCheck: &PassiveHealthCheck{
MaxFailures: 3,
Interval: 2 * time.Second,
},
MeshGateway: MeshGatewayConfig{Mode: MeshGatewayModeRemote},
},
destination: map[string]interface{}{
"envoy_listener_json": "zip",
"envoy_cluster_json": "zap",
"connect_timeout_ms": 10,
"protocol": "grpc",
"limits": &UpstreamLimits{
MaxConnections: intPointer(10),
MaxPendingRequests: intPointer(11),
MaxConcurrentRequests: intPointer(12),
},
"passive_health_check": &PassiveHealthCheck{
MaxFailures: 13,
Interval: 14 * time.Second,
},
"mesh_gateway": MeshGatewayConfig{Mode: MeshGatewayModeLocal},
},
want: map[string]interface{}{
"envoy_listener_json": "foo",
"envoy_cluster_json": "bar",
"connect_timeout_ms": 5,
"protocol": "http",
"limits": &UpstreamLimits{
MaxConnections: intPointer(3),
MaxPendingRequests: intPointer(4),
MaxConcurrentRequests: intPointer(5),
},
"passive_health_check": &PassiveHealthCheck{
MaxFailures: 3,
Interval: 2 * time.Second,
},
"mesh_gateway": MeshGatewayConfig{Mode: MeshGatewayModeRemote},
},
},
{
name: "empty source leaves destination intact",
source: UpstreamConfig{},
destination: map[string]interface{}{
"envoy_listener_json": "zip",
"envoy_cluster_json": "zap",
"connect_timeout_ms": 10,
"protocol": "grpc",
"limits": &UpstreamLimits{
MaxConnections: intPointer(10),
MaxPendingRequests: intPointer(11),
MaxConcurrentRequests: intPointer(12),
},
"passive_health_check": &PassiveHealthCheck{
MaxFailures: 13,
Interval: 14 * time.Second,
},
"mesh_gateway": MeshGatewayConfig{Mode: MeshGatewayModeLocal},
},
want: map[string]interface{}{
"envoy_listener_json": "zip",
"envoy_cluster_json": "zap",
"connect_timeout_ms": 10,
"protocol": "grpc",
"limits": &UpstreamLimits{
MaxConnections: intPointer(10),
MaxPendingRequests: intPointer(11),
MaxConcurrentRequests: intPointer(12),
},
"passive_health_check": &PassiveHealthCheck{
MaxFailures: 13,
Interval: 14 * time.Second,
},
"mesh_gateway": MeshGatewayConfig{Mode: MeshGatewayModeLocal},
},
},
{
name: "empty source and destination is a noop",
source: UpstreamConfig{},
destination: make(map[string]interface{}),
want: map[string]interface{}{},
},
}
for _, tc := range tt {
t.Run(tc.name, func(t *testing.T) {
tc.source.MergeInto(tc.destination)
assert.Equal(t, tc.want, tc.destination)
})
}
}
func TestParseUpstreamConfig(t *testing.T) {
tests := []struct {
name string
input map[string]interface{}
want UpstreamConfig
}{
{
name: "defaults - nil",
input: nil,
want: UpstreamConfig{
ConnectTimeoutMs: 5000,
},
},
{
name: "defaults - empty",
input: map[string]interface{}{},
want: UpstreamConfig{
ConnectTimeoutMs: 5000,
},
},
{
name: "defaults - other stuff",
input: map[string]interface{}{
"foo": "bar",
"envoy_foo": "envoy_bar",
},
want: UpstreamConfig{
ConnectTimeoutMs: 5000,
},
},
{
name: "protocol override",
input: map[string]interface{}{
"protocol": "http",
},
want: UpstreamConfig{
ConnectTimeoutMs: 5000,
Protocol: "http",
},
},
{
name: "connect timeout override, string",
input: map[string]interface{}{
"connect_timeout_ms": "1000",
},
want: UpstreamConfig{
ConnectTimeoutMs: 1000,
},
},
{
name: "connect timeout override, float ",
input: map[string]interface{}{
"connect_timeout_ms": float64(1000.0),
},
want: UpstreamConfig{
ConnectTimeoutMs: 1000,
},
},
{
name: "connect timeout override, int ",
input: map[string]interface{}{
"connect_timeout_ms": 1000,
},
want: UpstreamConfig{
ConnectTimeoutMs: 1000,
},
},
{
name: "connect limits map",
input: map[string]interface{}{
"limits": map[string]interface{}{
"max_connections": 50,
"max_pending_requests": 60,
"max_concurrent_requests": 70,
},
},
want: UpstreamConfig{
ConnectTimeoutMs: 5000,
Limits: &UpstreamLimits{
MaxConnections: intPointer(50),
MaxPendingRequests: intPointer(60),
MaxConcurrentRequests: intPointer(70),
},
},
},
{
name: "connect limits map zero",
input: map[string]interface{}{
"limits": map[string]interface{}{
"max_connections": 0,
"max_pending_requests": 0,
"max_concurrent_requests": 0,
},
},
want: UpstreamConfig{
ConnectTimeoutMs: 5000,
Limits: &UpstreamLimits{
MaxConnections: intPointer(0),
MaxPendingRequests: intPointer(0),
MaxConcurrentRequests: intPointer(0),
},
},
},
{
name: "passive health check map",
input: map[string]interface{}{
"passive_health_check": map[string]interface{}{
"interval": "22s",
"max_failures": 7,
},
},
want: UpstreamConfig{
ConnectTimeoutMs: 5000,
PassiveHealthCheck: &PassiveHealthCheck{
Interval: 22 * time.Second,
MaxFailures: 7,
},
},
},
{
name: "mesh gateway map",
input: map[string]interface{}{
"mesh_gateway": map[string]interface{}{
"Mode": "remote",
},
},
want: UpstreamConfig{
ConnectTimeoutMs: 5000,
MeshGateway: MeshGatewayConfig{
Mode: MeshGatewayModeRemote,
},
},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
got, err := ParseUpstreamConfig(tt.input)
require.NoError(t, err)
require.Equal(t, tt.want, got)
})
}
}
func requireContainsLower(t *testing.T, haystack, needle string) {
t.Helper()
require.Contains(t, strings.ToLower(haystack), strings.ToLower(needle))
}
func intPointer(i int) *int {
return &i
}

@ -386,14 +386,14 @@ func (s *Server) makeUpstreamClusterForPreparedQuery(upstream structs.Upstream,
}
sni := connect.UpstreamSNI(&upstream, "", dc, cfgSnap.Roots.TrustDomain)
cfg, err := ParseUpstreamConfig(upstream.Config)
cfg, err := structs.ParseUpstreamConfig(upstream.Config)
if err != nil {
// Don't hard fail on a config typo, just warn. The parse func returns
// default config if there is an error so it's safe to continue.
s.Logger.Warn("failed to parse", "upstream", upstream.Identifier(), "error", err)
}
if cfg.ClusterJSON != "" {
c, err = makeClusterFromUserConfig(cfg.ClusterJSON)
if cfg.EnvoyClusterJSON != "" {
c, err = makeClusterFromUserConfig(cfg.EnvoyClusterJSON)
if err != nil {
return c, err
}
@ -416,7 +416,7 @@ func (s *Server) makeUpstreamClusterForPreparedQuery(upstream structs.Upstream,
CircuitBreakers: &envoy_cluster_v3.CircuitBreakers{
Thresholds: makeThresholdsIfNeeded(cfg.Limits),
},
OutlierDetection: cfg.PassiveHealthCheck.AsOutlierDetection(),
OutlierDetection: ToOutlierDetection(cfg.PassiveHealthCheck),
}
if cfg.Protocol == "http2" || cfg.Protocol == "grpc" {
c.Http2ProtocolOptions = &envoy_core_v3.Http2ProtocolOptions{}
@ -448,7 +448,7 @@ func (s *Server) makeUpstreamClustersForDiscoveryChain(
return nil, fmt.Errorf("cannot create upstream cluster without discovery chain for %s", upstream.Identifier())
}
cfg, err := ParseUpstreamConfigNoDefaults(upstream.Config)
cfg, err := structs.ParseUpstreamConfigNoDefaults(upstream.Config)
if err != nil {
// Don't hard fail on a config typo, just warn. The parse func returns
// default config if there is an error so it's safe to continue.
@ -457,11 +457,11 @@ func (s *Server) makeUpstreamClustersForDiscoveryChain(
}
var escapeHatchCluster *envoy_cluster_v3.Cluster
if cfg.ClusterJSON != "" {
if cfg.EnvoyClusterJSON != "" {
if chain.IsDefault() {
// If you haven't done anything to setup the discovery chain, then
// you can use the envoy_cluster_json escape hatch.
escapeHatchCluster, err = makeClusterFromUserConfig(cfg.ClusterJSON)
escapeHatchCluster, err = makeClusterFromUserConfig(cfg.EnvoyClusterJSON)
if err != nil {
return nil, err
}
@ -524,7 +524,7 @@ func (s *Server) makeUpstreamClustersForDiscoveryChain(
CircuitBreakers: &envoy_cluster_v3.CircuitBreakers{
Thresholds: makeThresholdsIfNeeded(cfg.Limits),
},
OutlierDetection: cfg.PassiveHealthCheck.AsOutlierDetection(),
OutlierDetection: ToOutlierDetection(cfg.PassiveHealthCheck),
}
var lb *structs.LoadBalancer
@ -734,15 +734,13 @@ func (s *Server) makeGatewayCluster(snap *proxycfg.ConfigSnapshot, opts gatewayC
return cluster
}
func makeThresholdsIfNeeded(limits UpstreamLimits) []*envoy_cluster_v3.CircuitBreakers_Thresholds {
var empty UpstreamLimits
// Make sure to not create any thresholds when passed the zero-value in order
// to rely on Envoy defaults
if limits == empty {
func makeThresholdsIfNeeded(limits *structs.UpstreamLimits) []*envoy_cluster_v3.CircuitBreakers_Thresholds {
if limits == nil {
return nil
}
threshold := &envoy_cluster_v3.CircuitBreakers_Thresholds{}
// Likewise, make sure to not set any threshold values on the zero-value in
// order to rely on Envoy defaults
if limits.MaxConnections != nil {

@ -1,10 +1,8 @@
package xds
import (
"strings"
"time"
envoy_cluster_v3 "github.com/envoyproxy/go-control-plane/envoy/config/cluster/v3"
"strings"
"github.com/golang/protobuf/ptypes"
"github.com/golang/protobuf/ptypes/wrappers"
@ -150,75 +148,15 @@ func ParseGatewayConfig(m map[string]interface{}) (GatewayConfig, error) {
return cfg, err
}
// UpstreamLimits describes the limits that are associated with a specific
// upstream of a service instance.
type UpstreamLimits struct {
// MaxConnections is the maximum number of connections the local proxy can
// make to the upstream service.
MaxConnections *int `mapstructure:"max_connections"`
// MaxPendingRequests is the maximum number of requests that will be queued
// waiting for an available connection. This is mostly applicable to HTTP/1.1
// clusters since all HTTP/2 requests are streamed over a single
// connection.
MaxPendingRequests *int `mapstructure:"max_pending_requests"`
// MaxConcurrentRequests is the maximum number of in-flight requests that will be allowed
// to the upstream cluster at a point in time. This is mostly applicable to HTTP/2
// clusters since all HTTP/1.1 requests are limited by MaxConnections.
MaxConcurrentRequests *int `mapstructure:"max_concurrent_requests"`
}
// UpstreamConfig describes the keys we understand from
// Connect.Proxy.Upstream[*].Config.
type UpstreamConfig struct {
// ListenerJSON is a complete override ("escape hatch") for the upstream's
// listener.
//
// Note: This escape hatch is NOT compatible with the discovery chain and
// will be ignored if a discovery chain is active.
ListenerJSON string `mapstructure:"envoy_listener_json"`
// ClusterJSON is a complete override ("escape hatch") for the upstream's
// cluster. The Connect client TLS certificate and context will be injected
// overriding any TLS settings present.
//
// Note: This escape hatch is NOT compatible with the discovery chain and
// will be ignored if a discovery chain is active.
ClusterJSON string `mapstructure:"envoy_cluster_json"`
// Protocol describes the upstream's service protocol. Valid values are "tcp",
// "http" and "grpc". Anything else is treated as tcp. The enables protocol
// aware features like per-request metrics and connection pooling, tracing,
// routing etc.
Protocol string `mapstructure:"protocol"`
// ConnectTimeoutMs is the number of milliseconds to timeout making a new
// connection to this upstream. Defaults to 5000 (5 seconds) if not set.
ConnectTimeoutMs int `mapstructure:"connect_timeout_ms"`
// Limits are the set of limits that are applied to the proxy for a specific upstream of a
// service instance.
Limits UpstreamLimits `mapstructure:"limits"`
// PassiveHealthCheck configuration
PassiveHealthCheck PassiveHealthCheck `mapstructure:"passive_health_check"`
}
type PassiveHealthCheck struct {
// Interval between health check analysis sweeps. Each sweep may remove
// hosts or return hosts to the pool.
Interval time.Duration
// MaxFailures is the count of consecutive failures that results in a host
// being removed from the pool.
MaxFailures uint32 `mapstructure:"max_failures"`
}
// Return an envoy.OutlierDetection populated by the values from this struct.
// If all values are zero a default empty OutlierDetection will be returned to
// enable outlier detection with default values.
func (p PassiveHealthCheck) AsOutlierDetection() *envoy_cluster_v3.OutlierDetection {
func ToOutlierDetection(p *structs.PassiveHealthCheck) *envoy_cluster_v3.OutlierDetection {
od := &envoy_cluster_v3.OutlierDetection{}
if p == nil {
return od
}
if p.Interval != 0 {
od.Interval = ptypes.DurationProto(p.Interval)
}
@ -227,41 +165,3 @@ func (p PassiveHealthCheck) AsOutlierDetection() *envoy_cluster_v3.OutlierDetect
}
return od
}
func ParseUpstreamConfigNoDefaults(m map[string]interface{}) (UpstreamConfig, error) {
var cfg UpstreamConfig
config := &mapstructure.DecoderConfig{
DecodeHook: mapstructure.ComposeDecodeHookFunc(
decode.HookWeakDecodeFromSlice,
decode.HookTranslateKeys,
mapstructure.StringToTimeDurationHookFunc(),
),
Result: &cfg,
WeaklyTypedInput: true,
}
decoder, err := mapstructure.NewDecoder(config)
if err != nil {
return cfg, err
}
err = decoder.Decode(m)
return cfg, err
}
// ParseUpstreamConfig returns the UpstreamConfig parsed from an opaque map.
// If an error occurs during parsing it is returned along with the default
// config this allows caller to choose whether and how to report the error.
func ParseUpstreamConfig(m map[string]interface{}) (UpstreamConfig, error) {
cfg, err := ParseUpstreamConfigNoDefaults(m)
// Set defaults (even if error is returned)
if cfg.Protocol == "" {
cfg.Protocol = "tcp"
} else {
cfg.Protocol = strings.ToLower(cfg.Protocol)
}
if cfg.ConnectTimeoutMs < 1 {
cfg.ConnectTimeoutMs = 5000
}
return cfg, err
}

@ -2,11 +2,9 @@ package xds
import (
"testing"
"time"
"github.com/stretchr/testify/require"
"github.com/hashicorp/consul/agent/structs"
"github.com/stretchr/testify/require"
)
func TestParseProxyConfig(t *testing.T) {
@ -168,144 +166,6 @@ func TestParseProxyConfig(t *testing.T) {
}
}
func TestParseUpstreamConfig(t *testing.T) {
tests := []struct {
name string
input map[string]interface{}
want UpstreamConfig
}{
{
name: "defaults - nil",
input: nil,
want: UpstreamConfig{
ConnectTimeoutMs: 5000,
Protocol: "tcp",
},
},
{
name: "defaults - empty",
input: map[string]interface{}{},
want: UpstreamConfig{
ConnectTimeoutMs: 5000,
Protocol: "tcp",
},
},
{
name: "defaults - other stuff",
input: map[string]interface{}{
"foo": "bar",
"envoy_foo": "envoy_bar",
},
want: UpstreamConfig{
ConnectTimeoutMs: 5000,
Protocol: "tcp",
},
},
{
name: "protocol override",
input: map[string]interface{}{
"protocol": "http",
},
want: UpstreamConfig{
ConnectTimeoutMs: 5000,
Protocol: "http",
},
},
{
name: "connect timeout override, string",
input: map[string]interface{}{
"connect_timeout_ms": "1000",
},
want: UpstreamConfig{
ConnectTimeoutMs: 1000,
Protocol: "tcp",
},
},
{
name: "connect timeout override, float ",
input: map[string]interface{}{
"connect_timeout_ms": float64(1000.0),
},
want: UpstreamConfig{
ConnectTimeoutMs: 1000,
Protocol: "tcp",
},
},
{
name: "connect timeout override, int ",
input: map[string]interface{}{
"connect_timeout_ms": 1000,
},
want: UpstreamConfig{
ConnectTimeoutMs: 1000,
Protocol: "tcp",
},
},
{
name: "connect limits map",
input: map[string]interface{}{
"limits": map[string]interface{}{
"max_connections": 50,
"max_pending_requests": 60,
"max_concurrent_requests": 70,
},
},
want: UpstreamConfig{
ConnectTimeoutMs: 5000,
Protocol: "tcp",
Limits: UpstreamLimits{
MaxConnections: intPointer(50),
MaxPendingRequests: intPointer(60),
MaxConcurrentRequests: intPointer(70),
},
},
},
{
name: "connect limits map zero",
input: map[string]interface{}{
"limits": map[string]interface{}{
"max_connections": 0,
"max_pending_requests": 0,
"max_concurrent_requests": 0,
},
},
want: UpstreamConfig{
ConnectTimeoutMs: 5000,
Protocol: "tcp",
Limits: UpstreamLimits{
MaxConnections: intPointer(0),
MaxPendingRequests: intPointer(0),
MaxConcurrentRequests: intPointer(0),
},
},
},
{
name: "passive health check map",
input: map[string]interface{}{
"passive_health_check": map[string]interface{}{
"interval": "22s",
"max_failures": 7,
},
},
want: UpstreamConfig{
ConnectTimeoutMs: 5000,
Protocol: "tcp",
PassiveHealthCheck: PassiveHealthCheck{
Interval: 22 * time.Second,
MaxFailures: 7,
},
},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
got, err := ParseUpstreamConfig(tt.input)
require.NoError(t, err)
require.Equal(t, tt.want, got)
})
}
}
func TestParseGatewayConfig(t *testing.T) {
tests := []struct {
name string

@ -312,7 +312,7 @@ func (s *Server) endpointsFromDiscoveryChain(
return resources
}
cfg, err := ParseUpstreamConfigNoDefaults(upstream.Config)
cfg, err := structs.ParseUpstreamConfigNoDefaults(upstream.Config)
if err != nil {
// Don't hard fail on a config typo, just warn. The parse func returns
// default config if there is an error so it's safe to continue.
@ -321,11 +321,11 @@ func (s *Server) endpointsFromDiscoveryChain(
}
var escapeHatchCluster *envoy_cluster_v3.Cluster
if cfg.ClusterJSON != "" {
if cfg.EnvoyClusterJSON != "" {
if chain.IsDefault() {
// If you haven't done anything to setup the discovery chain, then
// you can use the envoy_cluster_json escape hatch.
escapeHatchCluster, err = makeClusterFromUserConfig(cfg.ClusterJSON)
escapeHatchCluster, err = makeClusterFromUserConfig(cfg.EnvoyClusterJSON)
if err != nil {
return resources
}

@ -987,8 +987,8 @@ func (s *Server) makeUpstreamListenerForDiscoveryChain(
l := makeListener(upstreamID, address, u.LocalBindPort, envoy_core_v3.TrafficDirection_OUTBOUND)
cfg := getAndModifyUpstreamConfigForListener(s.Logger, u, chain)
if cfg.ListenerJSON != "" {
return makeListenerFromUserConfig(cfg.ListenerJSON)
if cfg.EnvoyListenerJSON != "" {
return makeListenerFromUserConfig(cfg.EnvoyListenerJSON)
}
useRDS := true
@ -1071,14 +1071,14 @@ func (s *Server) makeUpstreamListenerForDiscoveryChain(
return l, nil
}
func getAndModifyUpstreamConfigForListener(logger hclog.Logger, u *structs.Upstream, chain *structs.CompiledDiscoveryChain) UpstreamConfig {
func getAndModifyUpstreamConfigForListener(logger hclog.Logger, u *structs.Upstream, chain *structs.CompiledDiscoveryChain) structs.UpstreamConfig {
var (
cfg UpstreamConfig
cfg structs.UpstreamConfig
err error
)
if chain == nil || chain.IsDefault() {
cfg, err = ParseUpstreamConfig(u.Config)
cfg, err = structs.ParseUpstreamConfig(u.Config)
if err != nil {
// Don't hard fail on a config typo, just warn. The parse func returns
// default config if there is an error so it's safe to continue.
@ -1087,19 +1087,19 @@ func getAndModifyUpstreamConfigForListener(logger hclog.Logger, u *structs.Upstr
} else {
// Use NoDefaults here so that we can set the protocol to the chain
// protocol if necessary
cfg, err = ParseUpstreamConfigNoDefaults(u.Config)
cfg, err = structs.ParseUpstreamConfigNoDefaults(u.Config)
if err != nil {
// Don't hard fail on a config typo, just warn. The parse func returns
// default config if there is an error so it's safe to continue.
logger.Warn("failed to parse", "upstream", u.Identifier(), "error", err)
}
if cfg.ListenerJSON != "" {
if cfg.EnvoyListenerJSON != "" {
logger.Warn("ignoring escape hatch setting because already configured for",
"discovery chain", chain.ServiceName, "upstream", u.Identifier(), "config", "envoy_listener_json")
// Remove from config struct so we don't use it later on
cfg.ListenerJSON = ""
cfg.EnvoyListenerJSON = ""
}
proto := cfg.Protocol

@ -91,15 +91,91 @@ type ExposePath struct {
ParsedFromCheck bool
}
type ConnectConfiguration struct {
// UpstreamConfigs is a map of <namespace/>service to per-upstream configuration
UpstreamConfigs map[string]UpstreamConfig `json:",omitempty" alias:"upstream_configs"`
// UpstreamDefaults contains default configuration for all upstreams of a given service
UpstreamDefaults UpstreamConfig `json:",omitempty" alias:"upstream_defaults"`
}
type UpstreamConfig struct {
// EnvoyListenerJSON is a complete override ("escape hatch") for the upstream's
// listener.
//
// Note: This escape hatch is NOT compatible with the discovery chain and
// will be ignored if a discovery chain is active.
EnvoyListenerJSON string `json:",omitempty" alias:"envoy_listener_json"`
// EnvoyClusterJSON is a complete override ("escape hatch") for the upstream's
// cluster. The Connect client TLS certificate and context will be injected
// overriding any TLS settings present.
//
// Note: This escape hatch is NOT compatible with the discovery chain and
// will be ignored if a discovery chain is active.
EnvoyClusterJSON string `json:",omitempty" alias:"envoy_cluster_json"`
// Protocol describes the upstream's service protocol. Valid values are "tcp",
// "http" and "grpc". Anything else is treated as tcp. The enables protocol
// aware features like per-request metrics and connection pooling, tracing,
// routing etc.
Protocol string `json:",omitempty"`
// ConnectTimeoutMs is the number of milliseconds to timeout making a new
// connection to this upstream. Defaults to 5000 (5 seconds) if not set.
ConnectTimeoutMs int `json:",omitempty" alias:"connect_timeout_ms"`
// Limits are the set of limits that are applied to the proxy for a specific upstream of a
// service instance.
Limits *UpstreamLimits `json:",omitempty"`
// PassiveHealthCheck configuration determines how upstream proxy instances will
// be monitored for removal from the load balancing pool.
PassiveHealthCheck *PassiveHealthCheck `json:",omitempty" alias:"passive_health_check"`
// MeshGatewayConfig controls how Mesh Gateways are configured and used
MeshGateway MeshGatewayConfig `json:",omitempty" alias:"mesh_gateway" `
}
type PassiveHealthCheck struct {
// Interval between health check analysis sweeps. Each sweep may remove
// hosts or return hosts to the pool.
Interval time.Duration `json:",omitempty"`
// MaxFailures is the count of consecutive failures that results in a host
// being removed from the pool.
MaxFailures uint32 `alias:"max_failures"`
}
// UpstreamLimits describes the limits that are associated with a specific
// upstream of a service instance.
type UpstreamLimits struct {
// MaxConnections is the maximum number of connections the local proxy can
// make to the upstream service.
MaxConnections int `alias:"max_connections"`
// MaxPendingRequests is the maximum number of requests that will be queued
// waiting for an available connection. This is mostly applicable to HTTP/1.1
// clusters since all HTTP/2 requests are streamed over a single
// connection.
MaxPendingRequests int `alias:"max_pending_requests"`
// MaxConcurrentRequests is the maximum number of in-flight requests that will be allowed
// to the upstream cluster at a point in time. This is mostly applicable to HTTP/2
// clusters since all HTTP/1.1 requests are limited by MaxConnections.
MaxConcurrentRequests int `alias:"max_concurrent_requests"`
}
type ServiceConfigEntry struct {
Kind string
Name string
Namespace string `json:",omitempty"`
Protocol string `json:",omitempty"`
MeshGateway MeshGatewayConfig `json:",omitempty" alias:"mesh_gateway"`
Expose ExposeConfig `json:",omitempty"`
ExternalSNI string `json:",omitempty" alias:"external_sni"`
Meta map[string]string `json:",omitempty"`
Namespace string `json:",omitempty"`
Protocol string `json:",omitempty"`
MeshGateway MeshGatewayConfig `json:",omitempty" alias:"mesh_gateway"`
Connect ConnectConfiguration `json:",omitempty"`
Expose ExposeConfig `json:",omitempty"`
ExternalSNI string `json:",omitempty" alias:"external_sni"`
Meta map[string]string `json:",omitempty"`
CreateIndex uint64
ModifyIndex uint64
}

@ -332,6 +332,36 @@ func TestDecodeConfigEntry(t *testing.T) {
"ExternalSNI": "abc-123",
"MeshGateway": {
"Mode": "remote"
},
"Connect": {
"UpstreamConfigs": {
"redis": {
"PassiveHealthCheck": {
"MaxFailures": 3,
"Interval": "2s"
}
},
"finance/billing": {
"MeshGateway": {
"Mode": "remote"
}
}
},
"UpstreamDefaults": {
"EnvoyClusterJSON": "zip",
"EnvoyListenerJSON": "zop",
"ConnectTimeoutMs": 5000,
"Protocol": "http",
"Limits": {
"MaxConnections": 3,
"MaxPendingRequests": 4,
"MaxConcurrentRequests": 5
},
"PassiveHealthCheck": {
"MaxFailures": 5,
"Interval": "4s"
}
}
}
}
`,
@ -347,6 +377,34 @@ func TestDecodeConfigEntry(t *testing.T) {
MeshGateway: MeshGatewayConfig{
Mode: MeshGatewayModeRemote,
},
Connect: ConnectConfiguration{
UpstreamConfigs: map[string]UpstreamConfig{
"redis": {
PassiveHealthCheck: &PassiveHealthCheck{
MaxFailures: 3,
Interval: 2 * time.Second,
},
},
"finance/billing": {
MeshGateway: MeshGatewayConfig{Mode: "remote"},
},
},
UpstreamDefaults: UpstreamConfig{
EnvoyClusterJSON: "zip",
EnvoyListenerJSON: "zop",
Protocol: "http",
ConnectTimeoutMs: 5000,
Limits: &UpstreamLimits{
MaxConnections: 3,
MaxPendingRequests: 4,
MaxConcurrentRequests: 5,
},
PassiveHealthCheck: &PassiveHealthCheck{
MaxFailures: 5,
Interval: 4 * time.Second,
},
},
},
},
},
{

@ -423,7 +423,7 @@ func TestParseConfigEntry(t *testing.T) {
},
},
{
name: "service-defaults",
name: "service-defaults: kitchen sink",
snake: `
kind = "service-defaults"
name = "main"
@ -436,6 +436,36 @@ func TestParseConfigEntry(t *testing.T) {
mesh_gateway {
mode = "remote"
}
connect {
upstream_configs {
"redis" {
passive_health_check {
max_failures = 3
interval = "2s"
}
}
"finance/billing" {
mesh_gateway {
mode = "remote"
}
}
}
upstream_defaults {
envoy_cluster_json = "zip"
envoy_listener_json = "zop"
connect_timeout_ms = 5000
protocol = "http"
limits {
max_connections = 3
max_pending_requests = 4
max_concurrent_requests = 5
}
passive_health_check {
max_failures = 5
interval = "4s"
}
}
}
`,
camel: `
Kind = "service-defaults"
@ -449,6 +479,36 @@ func TestParseConfigEntry(t *testing.T) {
MeshGateway {
Mode = "remote"
}
connect = {
upstream_configs = {
"redis" = {
passive_health_check = {
max_failures = 3
interval = "2s"
}
}
"finance/billing" = {
mesh_gateway = {
mode = "remote"
}
}
}
upstream_defaults = {
envoy_cluster_json = "zip"
envoy_listener_json = "zop"
connect_timeout_ms = 5000
protocol = "http"
limits = {
max_connections = 3
max_pending_requests = 4
max_concurrent_requests = 5
}
passive_health_check = {
max_failures = 5
interval = "4s"
}
}
}
`,
snakeJSON: `
{
@ -462,6 +522,36 @@ func TestParseConfigEntry(t *testing.T) {
"external_sni": "abc-123",
"mesh_gateway": {
"mode": "remote"
},
"connect": {
"upstream_configs": {
"redis": {
"passive_health_check": {
"max_failures": 3,
"interval": "2s"
}
},
"finance/billing": {
"mesh_gateway": {
"mode": "remote"
}
}
},
"upstream_defaults": {
"envoy_cluster_json": "zip",
"envoy_listener_json": "zop",
"connect_timeout_ms": 5000,
"protocol": "http",
"limits": {
"max_connections": 3,
"max_pending_requests": 4,
"max_concurrent_requests": 5
},
"passive_health_check": {
"max_failures": 5,
"interval": "4s"
}
}
}
}
`,
@ -477,6 +567,36 @@ func TestParseConfigEntry(t *testing.T) {
"ExternalSNI": "abc-123",
"MeshGateway": {
"Mode": "remote"
},
"Connect": {
"UpstreamConfigs": {
"redis": {
"PassiveHealthCheck": {
"MaxFailures": 3,
"Interval": "2s"
}
},
"finance/billing": {
"MeshGateway": {
"Mode": "remote"
}
}
},
"UpstreamDefaults": {
"EnvoyClusterJSON": "zip",
"EnvoyListenerJSON": "zop",
"ConnectTimeoutMs": 5000,
"Protocol": "http",
"Limits": {
"MaxConnections": 3,
"MaxPendingRequests": 4,
"MaxConcurrentRequests": 5
},
"PassiveHealthCheck": {
"MaxFailures": 5,
"Interval": "4s"
}
}
}
}
`,
@ -492,6 +612,36 @@ func TestParseConfigEntry(t *testing.T) {
MeshGateway: api.MeshGatewayConfig{
Mode: api.MeshGatewayModeRemote,
},
Connect: api.ConnectConfiguration{
UpstreamConfigs: map[string]api.UpstreamConfig{
"redis": {
PassiveHealthCheck: &api.PassiveHealthCheck{
MaxFailures: 3,
Interval: 2 * time.Second,
},
},
"finance/billing": {
MeshGateway: api.MeshGatewayConfig{
Mode: "remote",
},
},
},
UpstreamDefaults: api.UpstreamConfig{
EnvoyClusterJSON: "zip",
EnvoyListenerJSON: "zop",
Protocol: "http",
ConnectTimeoutMs: 5000,
Limits: &api.UpstreamLimits{
MaxConnections: 3,
MaxPendingRequests: 4,
MaxConcurrentRequests: 5,
},
PassiveHealthCheck: &api.PassiveHealthCheck{
MaxFailures: 5,
Interval: 4 * time.Second,
},
},
},
},
},
{

Loading…
Cancel
Save