diff --git a/agent/service_manager_test.go b/agent/service_manager_test.go
index 8ed22e9f5a..bdda31b179 100644
--- a/agent/service_manager_test.go
+++ b/agent/service_manager_test.go
@@ -418,8 +418,8 @@ func TestServiceManager_PersistService_API(t *testing.T) {
"foo": 1,
"protocol": "http",
},
- UpstreamIDConfigs: structs.UpstreamConfigs{
- structs.UpstreamConfig{
+ UpstreamIDConfigs: structs.OpaqueUpstreamConfigs{
+ structs.OpaqueUpstreamConfig{
Upstream: structs.NewServiceID("redis", nil),
Config: map[string]interface{}{
"protocol": "tcp",
@@ -459,8 +459,8 @@ func TestServiceManager_PersistService_API(t *testing.T) {
"foo": 1,
"protocol": "http",
},
- UpstreamIDConfigs: structs.UpstreamConfigs{
- structs.UpstreamConfig{
+ UpstreamIDConfigs: structs.OpaqueUpstreamConfigs{
+ structs.OpaqueUpstreamConfig{
Upstream: structs.NewServiceID("redis", nil),
Config: map[string]interface{}{
"protocol": "tcp",
@@ -634,8 +634,8 @@ func TestServiceManager_PersistService_ConfigFiles(t *testing.T) {
"foo": 1,
"protocol": "http",
},
- UpstreamIDConfigs: structs.UpstreamConfigs{
- structs.UpstreamConfig{
+ UpstreamIDConfigs: structs.OpaqueUpstreamConfigs{
+ structs.OpaqueUpstreamConfig{
Upstream: structs.NewServiceID("redis", nil),
Config: map[string]interface{}{
"protocol": "tcp",
diff --git a/agent/structs/config_entry.go b/agent/structs/config_entry.go
index 268c504056..33a50cd4dd 100644
--- a/agent/structs/config_entry.go
+++ b/agent/structs/config_entry.go
@@ -87,11 +87,7 @@ type ServiceConfigEntry struct {
ExternalSNI string `json:",omitempty" alias:"external_sni"`
- // TODO(banks): enable this once we have upstreams supported too. Enabling
- // sidecars actually makes no sense and adds complications when you don't
- // allow upstreams to be specified centrally too.
- //
- // Connect ConnectConfiguration
+ Connect *ConnectConfiguration `json:",omitempty"`
Meta map[string]string `json:",omitempty"`
EnterpriseMeta `hcl:",squash" mapstructure:",squash"`
@@ -131,13 +127,20 @@ func (e *ServiceConfigEntry) Normalize() error {
e.Kind = ServiceDefaults
e.Protocol = strings.ToLower(e.Protocol)
+ e.Connect.Normalize()
e.EnterpriseMeta.Normalize()
return nil
}
func (e *ServiceConfigEntry) Validate() error {
- return validateConfigEntryMeta(e.Meta)
+ validationErr := validateConfigEntryMeta(e.Meta)
+
+ if err := e.Connect.Validate(); err != nil {
+ validationErr = multierror.Append(validationErr, err)
+ }
+
+ return validationErr
}
func (e *ServiceConfigEntry) CanRead(authz acl.Authorizer) bool {
@@ -169,7 +172,38 @@ func (e *ServiceConfigEntry) GetEnterpriseMeta() *EnterpriseMeta {
}
type ConnectConfiguration struct {
- SidecarProxy bool
+ // UpstreamConfigs is a map of service to per-upstream configuration
+ UpstreamConfigs map[string]*UpstreamConfig `json:",omitempty" alias:"upstream_configs"`
+
+ // UpstreamDefaults contains default configuration for all upstreams of a given service
+ UpstreamDefaults *UpstreamConfig `json:",omitempty" alias:"upstream_defaults"`
+}
+
+func (cfg *ConnectConfiguration) Normalize() {
+ if cfg == nil {
+ return
+ }
+ for _, v := range cfg.UpstreamConfigs {
+ v.Normalize()
+ }
+
+ cfg.UpstreamDefaults.Normalize()
+}
+
+func (cfg ConnectConfiguration) Validate() error {
+ var validationErr error
+
+ for k, v := range cfg.UpstreamConfigs {
+ if err := v.Validate(); err != nil {
+ validationErr = multierror.Append(validationErr, fmt.Errorf("error in upstream config for %s: %v", k, err))
+ }
+ }
+
+ if err := cfg.UpstreamDefaults.Validate(); err != nil {
+ validationErr = multierror.Append(validationErr, fmt.Errorf("error in upstream defaults %v", err))
+ }
+
+ return validationErr
}
// ProxyConfigEntry is the top-level struct for global proxy configuration defaults.
@@ -592,13 +626,125 @@ func (r *ServiceConfigRequest) CacheInfo() cache.RequestInfo {
}
type UpstreamConfig struct {
+ // ListenerJSON is a complete override ("escape hatch") for the upstream's
+ // listener.
+ //
+ // Note: This escape hatch is NOT compatible with the discovery chain and
+ // will be ignored if a discovery chain is active.
+ ListenerJSON string `json:",omitempty" alias:"listener_json"`
+
+ // ClusterJSON is a complete override ("escape hatch") for the upstream's
+ // cluster. The Connect client TLS certificate and context will be injected
+ // overriding any TLS settings present.
+ //
+ // Note: This escape hatch is NOT compatible with the discovery chain and
+ // will be ignored if a discovery chain is active.
+ ClusterJSON string `alias:"cluster_json"`
+
+ // Protocol describes the upstream's service protocol. Valid values are "tcp",
+ // "http" and "grpc". Anything else is treated as tcp. The enables protocol
+ // aware features like per-request metrics and connection pooling, tracing,
+ // routing etc.
+ Protocol string
+
+ // ConnectTimeoutMs is the number of milliseconds to timeout making a new
+ // connection to this upstream. Defaults to 5000 (5 seconds) if not set.
+ ConnectTimeoutMs int `alias:"connect_timeout_ms"`
+
+ // Limits are the set of limits that are applied to the proxy for a specific upstream of a
+ // service instance.
+ Limits UpstreamLimits
+
+ // PassiveHealthCheck configuration determines how upstream proxy instances will
+ // be monitored for removal from the load balancing pool.
+ PassiveHealthCheck PassiveHealthCheck `json:",omitempty" alias:"passive_health_check"`
+
+ // MeshGatewayConfig controls how Mesh Gateways are configured and used
+ MeshGateway MeshGatewayConfig `json:",omitempty" alias:"mesh_gateway" `
+}
+
+func (cfg *UpstreamConfig) Normalize() {
+ if cfg.Protocol == "" {
+ cfg.Protocol = "tcp"
+ } else {
+ cfg.Protocol = strings.ToLower(cfg.Protocol)
+ }
+
+ if cfg.ConnectTimeoutMs < 1 {
+ cfg.ConnectTimeoutMs = 5000
+ }
+}
+
+func (cfg UpstreamConfig) Validate() error {
+ var validationErr error
+
+ if err := cfg.PassiveHealthCheck.Validate(); err != nil {
+ validationErr = multierror.Append(validationErr, err)
+ }
+ if err := cfg.Limits.Validate(); err != nil {
+ validationErr = multierror.Append(validationErr, err)
+ }
+
+ return validationErr
+}
+
+type PassiveHealthCheck struct {
+ // Interval between health check analysis sweeps. Each sweep may remove
+ // hosts or return hosts to the pool.
+ Interval time.Duration
+
+ // MaxFailures is the count of consecutive failures that results in a host
+ // being removed from the pool.
+ MaxFailures uint32 `alias:"max_failures"`
+}
+
+func (chk PassiveHealthCheck) Validate() error {
+ if chk.Interval <= 0*time.Second {
+ return fmt.Errorf("passive health check interval must be greater than 0s")
+ }
+ return nil
+}
+
+// UpstreamLimits describes the limits that are associated with a specific
+// upstream of a service instance.
+type UpstreamLimits struct {
+ // MaxConnections is the maximum number of connections the local proxy can
+ // make to the upstream service.
+ MaxConnections *int `alias:"max_connections"`
+
+ // MaxPendingRequests is the maximum number of requests that will be queued
+ // waiting for an available connection. This is mostly applicable to HTTP/1.1
+ // clusters since all HTTP/2 requests are streamed over a single
+ // connection.
+ MaxPendingRequests *int `alias:"max_pending_requests"`
+
+ // MaxConcurrentRequests is the maximum number of in-flight requests that will be allowed
+ // to the upstream cluster at a point in time. This is mostly applicable to HTTP/2
+ // clusters since all HTTP/1.1 requests are limited by MaxConnections.
+ MaxConcurrentRequests *int `alias:"max_concurrent_requests"`
+}
+
+func (ul UpstreamLimits) Validate() error {
+ if ul.MaxConnections != nil && *ul.MaxConnections <= 0 {
+ return fmt.Errorf("max connections must be at least 0")
+ }
+ if ul.MaxPendingRequests != nil && *ul.MaxPendingRequests <= 0 {
+ return fmt.Errorf("max pending requests must be at least 0")
+ }
+ if ul.MaxConcurrentRequests != nil && *ul.MaxConcurrentRequests <= 0 {
+ return fmt.Errorf("max concurrent requests must be at least 0")
+ }
+ return nil
+}
+
+type OpaqueUpstreamConfig struct {
Upstream ServiceID
Config map[string]interface{}
}
-type UpstreamConfigs []UpstreamConfig
+type OpaqueUpstreamConfigs []OpaqueUpstreamConfig
-func (configs UpstreamConfigs) GetUpstreamConfig(sid ServiceID) (config map[string]interface{}, found bool) {
+func (configs OpaqueUpstreamConfigs) GetUpstreamConfig(sid ServiceID) (config map[string]interface{}, found bool) {
for _, usconf := range configs {
if usconf.Upstream.Matches(sid) {
return usconf.Config, true
@@ -611,7 +757,7 @@ func (configs UpstreamConfigs) GetUpstreamConfig(sid ServiceID) (config map[stri
type ServiceConfigResponse struct {
ProxyConfig map[string]interface{}
UpstreamConfigs map[string]map[string]interface{}
- UpstreamIDConfigs UpstreamConfigs
+ UpstreamIDConfigs OpaqueUpstreamConfigs
MeshGateway MeshGatewayConfig `json:",omitempty"`
Expose ExposeConfig `json:",omitempty"`
QueryMeta
diff --git a/agent/structs/config_entry_test.go b/agent/structs/config_entry_test.go
index e5a92de3ff..272e1a1833 100644
--- a/agent/structs/config_entry_test.go
+++ b/agent/structs/config_entry_test.go
@@ -112,6 +112,33 @@ func TestDecodeConfigEntry(t *testing.T) {
mesh_gateway {
mode = "remote"
}
+ connect {
+ upstream_configs {
+ redis {
+ passive_health_check {
+ interval = "2s"
+ max_failures = 3
+ }
+ }
+
+ "finance/billing" {
+ mesh_gateway {
+ mode = "remote"
+ }
+ }
+ }
+ upstream_defaults {
+ connect_timeout_ms = 5
+ protocol = "http"
+ listener_json = "foo"
+ cluster_json = "bar"
+ limits {
+ max_connections = 3
+ max_pending_requests = 4
+ max_concurrent_requests = 5
+ }
+ }
+ }
`,
camel: `
Kind = "service-defaults"
@@ -125,6 +152,33 @@ func TestDecodeConfigEntry(t *testing.T) {
MeshGateway {
Mode = "remote"
}
+ Connect {
+ UpstreamConfigs {
+ "redis" {
+ PassiveHealthCheck {
+ MaxFailures = 3
+ Interval = "2s"
+ }
+ }
+
+ "finance/billing" {
+ MeshGateway {
+ Mode = "remote"
+ }
+ }
+ }
+ UpstreamDefaults {
+ ListenerJSON = "foo"
+ ClusterJSON = "bar"
+ ConnectTimeoutMs = 5
+ Protocol = "http"
+ Limits {
+ MaxConnections = 3
+ MaxPendingRequests = 4
+ MaxConcurrentRequests = 5
+ }
+ }
+ }
`,
expect: &ServiceConfigEntry{
Kind: "service-defaults",
@@ -138,6 +192,30 @@ func TestDecodeConfigEntry(t *testing.T) {
MeshGateway: MeshGatewayConfig{
Mode: MeshGatewayModeRemote,
},
+ Connect: &ConnectConfiguration{
+ UpstreamConfigs: map[string]*UpstreamConfig{
+ "redis": {
+ PassiveHealthCheck: PassiveHealthCheck{
+ MaxFailures: 3,
+ Interval: 2 * time.Second,
+ },
+ },
+ "finance/billing": {
+ MeshGateway: MeshGatewayConfig{Mode: MeshGatewayModeRemote},
+ },
+ },
+ UpstreamDefaults: &UpstreamConfig{
+ ListenerJSON: "foo",
+ ClusterJSON: "bar",
+ ConnectTimeoutMs: 5,
+ Protocol: "http",
+ Limits: UpstreamLimits{
+ MaxConnections: intPointer(3),
+ MaxPendingRequests: intPointer(4),
+ MaxConcurrentRequests: intPointer(5),
+ },
+ },
+ },
},
},
{
@@ -1330,7 +1408,199 @@ func TestConfigEntryResponseMarshalling(t *testing.T) {
}
}
+func TestPassiveHealthCheck_Validate(t *testing.T) {
+ tt := []struct {
+ name string
+ input PassiveHealthCheck
+ wantErr bool
+ wantMsg string
+ }{
+ {
+ name: "valid-interval",
+ input: PassiveHealthCheck{Interval: 2 * time.Second},
+ wantErr: false,
+ },
+ {
+ name: "negative-interval",
+ input: PassiveHealthCheck{Interval: -1 * time.Second},
+ wantErr: true,
+ wantMsg: "greater than 0s",
+ },
+ {
+ name: "zero-interval",
+ input: PassiveHealthCheck{Interval: 0 * time.Second},
+ wantErr: true,
+ wantMsg: "greater than 0s",
+ },
+ }
+
+ for _, tc := range tt {
+ t.Run(tc.name, func(t *testing.T) {
+ err := tc.input.Validate()
+ if err == nil {
+ require.False(t, tc.wantErr)
+ return
+ }
+ require.Contains(t, err.Error(), tc.wantMsg)
+ })
+ }
+}
+
+func TestUpstreamLimits_Validate(t *testing.T) {
+ tt := []struct {
+ name string
+ input UpstreamLimits
+ wantErr bool
+ wantMsg string
+ }{
+ {
+ name: "valid-max-conns",
+ input: UpstreamLimits{MaxConnections: intPointer(1)},
+ wantErr: false,
+ },
+ {
+ name: "zero-max-conns",
+ input: UpstreamLimits{MaxConnections: intPointer(0)},
+ wantErr: true,
+ wantMsg: "at least 0",
+ },
+ {
+ name: "negative-max-conns",
+ input: UpstreamLimits{MaxConnections: intPointer(-1)},
+ wantErr: true,
+ wantMsg: "at least 0",
+ },
+ {
+ name: "valid-max-concurrent",
+ input: UpstreamLimits{MaxConcurrentRequests: intPointer(1)},
+ wantErr: false,
+ },
+ {
+ name: "zero-max-concurrent",
+ input: UpstreamLimits{MaxConcurrentRequests: intPointer(0)},
+ wantErr: true,
+ wantMsg: "at least 0",
+ },
+ {
+ name: "negative-max-concurrent",
+ input: UpstreamLimits{MaxConcurrentRequests: intPointer(-1)},
+ wantErr: true,
+ wantMsg: "at least 0",
+ },
+ {
+ name: "valid-max-pending",
+ input: UpstreamLimits{MaxPendingRequests: intPointer(1)},
+ wantErr: false,
+ },
+ {
+ name: "zero-max-pending",
+ input: UpstreamLimits{MaxPendingRequests: intPointer(0)},
+ wantErr: true,
+ wantMsg: "at least 0",
+ },
+ {
+ name: "negative-max-pending",
+ input: UpstreamLimits{MaxPendingRequests: intPointer(-1)},
+ wantErr: true,
+ wantMsg: "at least 0",
+ },
+ }
+
+ for _, tc := range tt {
+ t.Run(tc.name, func(t *testing.T) {
+ err := tc.input.Validate()
+ if err == nil {
+ require.False(t, tc.wantErr)
+ return
+ }
+ require.Contains(t, err.Error(), tc.wantMsg)
+ })
+ }
+}
+
+func TestServiceConfigEntry_Normalize(t *testing.T) {
+ tt := []struct {
+ name string
+ input ServiceConfigEntry
+ expect ServiceConfigEntry
+ }{
+ {
+ name: "fill-in-kind",
+ input: ServiceConfigEntry{
+ Name: "web",
+ },
+ expect: ServiceConfigEntry{
+ Kind: ServiceDefaults,
+ Name: "web",
+ },
+ },
+ {
+ name: "lowercase-protocol",
+ input: ServiceConfigEntry{
+ Kind: ServiceDefaults,
+ Name: "web",
+ Protocol: "PrOtoCoL",
+ },
+ expect: ServiceConfigEntry{
+ Kind: ServiceDefaults,
+ Name: "web",
+ Protocol: "protocol",
+ },
+ },
+ {
+ name: "connect-kitchen-sink",
+ input: ServiceConfigEntry{
+ Kind: ServiceDefaults,
+ Name: "web",
+ Connect: &ConnectConfiguration{
+ UpstreamConfigs: map[string]*UpstreamConfig{
+ "redis": {
+ Protocol: "TcP",
+ },
+ "memcached": {
+ ConnectTimeoutMs: -1,
+ },
+ },
+ UpstreamDefaults: &UpstreamConfig{ConnectTimeoutMs: -20},
+ },
+ },
+ expect: ServiceConfigEntry{
+ Kind: ServiceDefaults,
+ Name: "web",
+ Connect: &ConnectConfiguration{
+ UpstreamConfigs: map[string]*UpstreamConfig{
+ "redis": {
+ Protocol: "tcp",
+ ConnectTimeoutMs: 5000,
+ },
+ "memcached": {
+ Protocol: "tcp",
+ ConnectTimeoutMs: 5000,
+ },
+ },
+ UpstreamDefaults: &UpstreamConfig{
+ Protocol: "tcp",
+ ConnectTimeoutMs: 5000,
+ },
+ },
+ },
+ },
+ }
+
+ for _, tc := range tt {
+ t.Run(tc.name, func(t *testing.T) {
+ err := tc.input.Normalize()
+ require.NoError(t, err)
+ require.Equal(t, tc.expect, tc.input)
+ })
+ }
+}
+
func requireContainsLower(t *testing.T, haystack, needle string) {
t.Helper()
require.Contains(t, strings.ToLower(haystack), strings.ToLower(needle))
}
+
+func intPointer(i int) *int {
+ return &i
+}
diff --git a/agent/xds/clusters.go b/agent/xds/clusters.go
index fa527e2a70..0411ab147b 100644
--- a/agent/xds/clusters.go
+++ b/agent/xds/clusters.go
@@ -416,7 +416,7 @@ func (s *Server) makeUpstreamClusterForPreparedQuery(upstream structs.Upstream,
CircuitBreakers: &envoy_cluster_v3.CircuitBreakers{
Thresholds: makeThresholdsIfNeeded(cfg.Limits),
},
- OutlierDetection: cfg.PassiveHealthCheck.AsOutlierDetection(),
+ OutlierDetection: ToOutlierDetection(cfg.PassiveHealthCheck),
}
if cfg.Protocol == "http2" || cfg.Protocol == "grpc" {
c.Http2ProtocolOptions = &envoy_core_v3.Http2ProtocolOptions{}
@@ -524,7 +524,7 @@ func (s *Server) makeUpstreamClustersForDiscoveryChain(
CircuitBreakers: &envoy_cluster_v3.CircuitBreakers{
Thresholds: makeThresholdsIfNeeded(cfg.Limits),
},
- OutlierDetection: cfg.PassiveHealthCheck.AsOutlierDetection(),
+ OutlierDetection: ToOutlierDetection(cfg.PassiveHealthCheck),
}
var lb *structs.LoadBalancer
@@ -734,8 +734,8 @@ func (s *Server) makeGatewayCluster(snap *proxycfg.ConfigSnapshot, opts gatewayC
return cluster
}
-func makeThresholdsIfNeeded(limits UpstreamLimits) []*envoy_cluster_v3.CircuitBreakers_Thresholds {
- var empty UpstreamLimits
+func makeThresholdsIfNeeded(limits structs.UpstreamLimits) []*envoy_cluster_v3.CircuitBreakers_Thresholds {
+ var empty structs.UpstreamLimits
// Make sure to not create any thresholds when passed the zero-value in order
// to rely on Envoy defaults
if limits == empty {
@@ -743,6 +743,7 @@ func makeThresholdsIfNeeded(limits UpstreamLimits) []*envoy_cluster_v3.CircuitBr
}
threshold := &envoy_cluster_v3.CircuitBreakers_Thresholds{}
+
// Likewise, make sure to not set any threshold values on the zero-value in
// order to rely on Envoy defaults
if limits.MaxConnections != nil {
diff --git a/agent/xds/config.go b/agent/xds/config.go
index 57fb0d25ef..dfec6493c2 100644
--- a/agent/xds/config.go
+++ b/agent/xds/config.go
@@ -1,10 +1,8 @@
package xds
import (
- "strings"
- "time"
-
envoy_cluster_v3 "github.com/envoyproxy/go-control-plane/envoy/config/cluster/v3"
+ "strings"
"github.com/golang/protobuf/ptypes"
"github.com/golang/protobuf/ptypes/wrappers"
@@ -150,74 +148,10 @@ func ParseGatewayConfig(m map[string]interface{}) (GatewayConfig, error) {
return cfg, err
}
-// UpstreamLimits describes the limits that are associated with a specific
-// upstream of a service instance.
-type UpstreamLimits struct {
- // MaxConnections is the maximum number of connections the local proxy can
- // make to the upstream service.
- MaxConnections *int `mapstructure:"max_connections"`
-
- // MaxPendingRequests is the maximum number of requests that will be queued
- // waiting for an available connection. This is mostly applicable to HTTP/1.1
- // clusters since all HTTP/2 requests are streamed over a single
- // connection.
- MaxPendingRequests *int `mapstructure:"max_pending_requests"`
-
- // MaxConcurrentRequests is the maximum number of in-flight requests that will be allowed
- // to the upstream cluster at a point in time. This is mostly applicable to HTTP/2
- // clusters since all HTTP/1.1 requests are limited by MaxConnections.
- MaxConcurrentRequests *int `mapstructure:"max_concurrent_requests"`
-}
-
-// UpstreamConfig describes the keys we understand from
-// Connect.Proxy.Upstream[*].Config.
-type UpstreamConfig struct {
- // ListenerJSON is a complete override ("escape hatch") for the upstream's
- // listener.
- //
- // Note: This escape hatch is NOT compatible with the discovery chain and
- // will be ignored if a discovery chain is active.
- ListenerJSON string `mapstructure:"envoy_listener_json"`
-
- // ClusterJSON is a complete override ("escape hatch") for the upstream's
- // cluster. The Connect client TLS certificate and context will be injected
- // overriding any TLS settings present.
- //
- // Note: This escape hatch is NOT compatible with the discovery chain and
- // will be ignored if a discovery chain is active.
- ClusterJSON string `mapstructure:"envoy_cluster_json"`
-
- // Protocol describes the upstream's service protocol. Valid values are "tcp",
- // "http" and "grpc". Anything else is treated as tcp. The enables protocol
- // aware features like per-request metrics and connection pooling, tracing,
- // routing etc.
- Protocol string `mapstructure:"protocol"`
-
- // ConnectTimeoutMs is the number of milliseconds to timeout making a new
- // connection to this upstream. Defaults to 5000 (5 seconds) if not set.
- ConnectTimeoutMs int `mapstructure:"connect_timeout_ms"`
-
- // Limits are the set of limits that are applied to the proxy for a specific upstream of a
- // service instance.
- Limits UpstreamLimits `mapstructure:"limits"`
-
- // PassiveHealthCheck configuration
- PassiveHealthCheck PassiveHealthCheck `mapstructure:"passive_health_check"`
-}
-
-type PassiveHealthCheck struct {
- // Interval between health check analysis sweeps. Each sweep may remove
- // hosts or return hosts to the pool.
- Interval time.Duration
- // MaxFailures is the count of consecutive failures that results in a host
- // being removed from the pool.
- MaxFailures uint32 `mapstructure:"max_failures"`
-}
-
// Return an envoy.OutlierDetection populated by the values from this struct.
// If all values are zero a default empty OutlierDetection will be returned to
// enable outlier detection with default values.
-func (p PassiveHealthCheck) AsOutlierDetection() *envoy_cluster_v3.OutlierDetection {
+func ToOutlierDetection(p structs.PassiveHealthCheck) *envoy_cluster_v3.OutlierDetection {
od := &envoy_cluster_v3.OutlierDetection{}
if p.Interval != 0 {
od.Interval = ptypes.DurationProto(p.Interval)
@@ -228,8 +162,8 @@ func (p PassiveHealthCheck) AsOutlierDetection() *envoy_cluster_v3.OutlierDetect
return od
}
-func ParseUpstreamConfigNoDefaults(m map[string]interface{}) (UpstreamConfig, error) {
- var cfg UpstreamConfig
+func ParseUpstreamConfigNoDefaults(m map[string]interface{}) (structs.UpstreamConfig, error) {
+ var cfg structs.UpstreamConfig
config := &mapstructure.DecoderConfig{
DecodeHook: mapstructure.ComposeDecodeHookFunc(
decode.HookWeakDecodeFromSlice,
@@ -252,16 +186,11 @@ func ParseUpstreamConfigNoDefaults(m map[string]interface{}) (UpstreamConfig, er
// ParseUpstreamConfig returns the UpstreamConfig parsed from an opaque map.
// If an error occurs during parsing it is returned along with the default
// config this allows caller to choose whether and how to report the error.
-func ParseUpstreamConfig(m map[string]interface{}) (UpstreamConfig, error) {
+func ParseUpstreamConfig(m map[string]interface{}) (structs.UpstreamConfig, error) {
cfg, err := ParseUpstreamConfigNoDefaults(m)
+
// Set defaults (even if error is returned)
- if cfg.Protocol == "" {
- cfg.Protocol = "tcp"
- } else {
- cfg.Protocol = strings.ToLower(cfg.Protocol)
- }
- if cfg.ConnectTimeoutMs < 1 {
- cfg.ConnectTimeoutMs = 5000
- }
+ cfg.Normalize()
+
return cfg, err
}
diff --git a/agent/xds/config_test.go b/agent/xds/config_test.go
index e1e8c7276c..0265c01518 100644
--- a/agent/xds/config_test.go
+++ b/agent/xds/config_test.go
@@ -172,12 +172,12 @@ func TestParseUpstreamConfig(t *testing.T) {
tests := []struct {
name string
input map[string]interface{}
- want UpstreamConfig
+ want structs.UpstreamConfig
}{
{
name: "defaults - nil",
input: nil,
- want: UpstreamConfig{
+ want: structs.UpstreamConfig{
ConnectTimeoutMs: 5000,
Protocol: "tcp",
},
@@ -185,7 +185,7 @@ func TestParseUpstreamConfig(t *testing.T) {
{
name: "defaults - empty",
input: map[string]interface{}{},
- want: UpstreamConfig{
+ want: structs.UpstreamConfig{
ConnectTimeoutMs: 5000,
Protocol: "tcp",
},
@@ -196,7 +196,7 @@ func TestParseUpstreamConfig(t *testing.T) {
"foo": "bar",
"envoy_foo": "envoy_bar",
},
- want: UpstreamConfig{
+ want: structs.UpstreamConfig{
ConnectTimeoutMs: 5000,
Protocol: "tcp",
},
@@ -206,7 +206,7 @@ func TestParseUpstreamConfig(t *testing.T) {
input: map[string]interface{}{
"protocol": "http",
},
- want: UpstreamConfig{
+ want: structs.UpstreamConfig{
ConnectTimeoutMs: 5000,
Protocol: "http",
},
@@ -216,7 +216,7 @@ func TestParseUpstreamConfig(t *testing.T) {
input: map[string]interface{}{
"connect_timeout_ms": "1000",
},
- want: UpstreamConfig{
+ want: structs.UpstreamConfig{
ConnectTimeoutMs: 1000,
Protocol: "tcp",
},
@@ -226,7 +226,7 @@ func TestParseUpstreamConfig(t *testing.T) {
input: map[string]interface{}{
"connect_timeout_ms": float64(1000.0),
},
- want: UpstreamConfig{
+ want: structs.UpstreamConfig{
ConnectTimeoutMs: 1000,
Protocol: "tcp",
},
@@ -236,7 +236,7 @@ func TestParseUpstreamConfig(t *testing.T) {
input: map[string]interface{}{
"connect_timeout_ms": 1000,
},
- want: UpstreamConfig{
+ want: structs.UpstreamConfig{
ConnectTimeoutMs: 1000,
Protocol: "tcp",
},
@@ -250,10 +250,10 @@ func TestParseUpstreamConfig(t *testing.T) {
"max_concurrent_requests": 70,
},
},
- want: UpstreamConfig{
+ want: structs.UpstreamConfig{
ConnectTimeoutMs: 5000,
Protocol: "tcp",
- Limits: UpstreamLimits{
+ Limits: structs.UpstreamLimits{
MaxConnections: intPointer(50),
MaxPendingRequests: intPointer(60),
MaxConcurrentRequests: intPointer(70),
@@ -269,10 +269,10 @@ func TestParseUpstreamConfig(t *testing.T) {
"max_concurrent_requests": 0,
},
},
- want: UpstreamConfig{
+ want: structs.UpstreamConfig{
ConnectTimeoutMs: 5000,
Protocol: "tcp",
- Limits: UpstreamLimits{
+ Limits: structs.UpstreamLimits{
MaxConnections: intPointer(0),
MaxPendingRequests: intPointer(0),
MaxConcurrentRequests: intPointer(0),
@@ -287,10 +287,10 @@ func TestParseUpstreamConfig(t *testing.T) {
"max_failures": 7,
},
},
- want: UpstreamConfig{
+ want: structs.UpstreamConfig{
ConnectTimeoutMs: 5000,
Protocol: "tcp",
- PassiveHealthCheck: PassiveHealthCheck{
+ PassiveHealthCheck: structs.PassiveHealthCheck{
Interval: 22 * time.Second,
MaxFailures: 7,
},
diff --git a/agent/xds/listeners.go b/agent/xds/listeners.go
index 3e5f552f48..77d33b47d4 100644
--- a/agent/xds/listeners.go
+++ b/agent/xds/listeners.go
@@ -1071,9 +1071,9 @@ func (s *Server) makeUpstreamListenerForDiscoveryChain(
return l, nil
}
-func getAndModifyUpstreamConfigForListener(logger hclog.Logger, u *structs.Upstream, chain *structs.CompiledDiscoveryChain) UpstreamConfig {
+func getAndModifyUpstreamConfigForListener(logger hclog.Logger, u *structs.Upstream, chain *structs.CompiledDiscoveryChain) structs.UpstreamConfig {
var (
- cfg UpstreamConfig
+ cfg structs.UpstreamConfig
err error
)
diff --git a/api/config_entry.go b/api/config_entry.go
index f5ef60e294..a6bace0a98 100644
--- a/api/config_entry.go
+++ b/api/config_entry.go
@@ -91,15 +91,91 @@ type ExposePath struct {
ParsedFromCheck bool
}
+type ConnectConfiguration struct {
+ // UpstreamConfigs is a map of service to per-upstream configuration
+ UpstreamConfigs map[string]UpstreamConfig `json:",omitempty" alias:"upstream_configs"`
+
+ // UpstreamDefaults contains default configuration for all upstreams of a given service
+ UpstreamDefaults UpstreamConfig `json:",omitempty" alias:"upstream_defaults"`
+}
+
+type UpstreamConfig struct {
+ // ListenerJSON is a complete override ("escape hatch") for the upstream's
+ // listener.
+ //
+ // Note: This escape hatch is NOT compatible with the discovery chain and
+ // will be ignored if a discovery chain is active.
+ ListenerJSON string `json:",omitempty" alias:"listener_json"`
+
+ // ClusterJSON is a complete override ("escape hatch") for the upstream's
+ // cluster. The Connect client TLS certificate and context will be injected
+ // overriding any TLS settings present.
+ //
+ // Note: This escape hatch is NOT compatible with the discovery chain and
+ // will be ignored if a discovery chain is active.
+ ClusterJSON string `alias:"cluster_json"`
+
+ // Protocol describes the upstream's service protocol. Valid values are "tcp",
+ // "http" and "grpc". Anything else is treated as tcp. The enables protocol
+ // aware features like per-request metrics and connection pooling, tracing,
+ // routing etc.
+ Protocol string
+
+ // ConnectTimeoutMs is the number of milliseconds to timeout making a new
+ // connection to this upstream. Defaults to 5000 (5 seconds) if not set.
+ ConnectTimeoutMs int `alias:"connect_timeout_ms"`
+
+ // Limits are the set of limits that are applied to the proxy for a specific upstream of a
+ // service instance.
+ Limits UpstreamLimits
+
+ // PassiveHealthCheck configuration determines how upstream proxy instances will
+ // be monitored for removal from the load balancing pool.
+ PassiveHealthCheck PassiveHealthCheck `json:",omitempty" alias:"passive_health_check"`
+
+ // MeshGatewayConfig controls how Mesh Gateways are configured and used
+ MeshGateway MeshGatewayConfig `json:",omitempty" alias:"mesh_gateway" `
+}
+
+type PassiveHealthCheck struct {
+ // Interval between health check analysis sweeps. Each sweep may remove
+ // hosts or return hosts to the pool.
+ Interval time.Duration
+
+ // MaxFailures is the count of consecutive failures that results in a host
+ // being removed from the pool.
+ MaxFailures uint32 `alias:"max_failures"`
+}
+
+// UpstreamLimits describes the limits that are associated with a specific
+// upstream of a service instance.
+type UpstreamLimits struct {
+ // MaxConnections is the maximum number of connections the local proxy can
+ // make to the upstream service.
+ MaxConnections int `alias:"max_connections"`
+
+ // MaxPendingRequests is the maximum number of requests that will be queued
+ // waiting for an available connection. This is mostly applicable to HTTP/1.1
+ // clusters since all HTTP/2 requests are streamed over a single
+ // connection.
+ MaxPendingRequests int `alias:"max_pending_requests"`
+
+ // MaxConcurrentRequests is the maximum number of in-flight requests that will be allowed
+ // to the upstream cluster at a point in time. This is mostly applicable to HTTP/2
+ // clusters since all HTTP/1.1 requests are limited by MaxConnections.
+ MaxConcurrentRequests int `alias:"max_concurrent_requests"`
+}
+
type ServiceConfigEntry struct {
Kind string
Name string
- Namespace string `json:",omitempty"`
- Protocol string `json:",omitempty"`
- MeshGateway MeshGatewayConfig `json:",omitempty" alias:"mesh_gateway"`
- Expose ExposeConfig `json:",omitempty"`
- ExternalSNI string `json:",omitempty" alias:"external_sni"`
- Meta map[string]string `json:",omitempty"`
+ Namespace string `json:",omitempty"`
+ Protocol string `json:",omitempty"`
+ MeshGateway MeshGatewayConfig `json:",omitempty" alias:"mesh_gateway"`
+ Connect ConnectConfiguration `json:",omitempty"`
+ Expose ExposeConfig `json:",omitempty"`
+ ExternalSNI string `json:",omitempty" alias:"external_sni"`
+ Meta map[string]string `json:",omitempty"`
CreateIndex uint64
ModifyIndex uint64
}
diff --git a/api/config_entry_test.go b/api/config_entry_test.go
index 334c972521..62b0d79664 100644
--- a/api/config_entry_test.go
+++ b/api/config_entry_test.go
@@ -332,6 +332,36 @@ func TestDecodeConfigEntry(t *testing.T) {
"ExternalSNI": "abc-123",
"MeshGateway": {
"Mode": "remote"
+ },
+ "Connect": {
+ "UpstreamConfigs": {
+ "redis": {
+ "PassiveHealthCheck": {
+ "MaxFailures": 3,
+ "Interval": "2s"
+ }
+ },
+ "finance/billing": {
+ "MeshGateway": {
+ "Mode": "remote"
+ }
+ }
+ },
+ "UpstreamDefaults": {
+ "ClusterJSON": "zip",
+ "ListenerJSON": "zop",
+ "ConnectTimeoutMs": 5000,
+ "Protocol": "http",
+ "Limits": {
+ "MaxConnections": 3,
+ "MaxPendingRequests": 4,
+ "MaxConcurrentRequests": 5
+ },
+ "PassiveHealthCheck": {
+ "MaxFailures": 5,
+ "Interval": "4s"
+ }
+ }
}
}
`,
@@ -347,6 +377,34 @@ func TestDecodeConfigEntry(t *testing.T) {
MeshGateway: MeshGatewayConfig{
Mode: MeshGatewayModeRemote,
},
+ Connect: ConnectConfiguration{
+ UpstreamConfigs: map[string]UpstreamConfig{
+ "redis": {
+ PassiveHealthCheck: PassiveHealthCheck{
+ MaxFailures: 3,
+ Interval: 2 * time.Second,
+ },
+ },
+ "finance/billing": {
+ MeshGateway: MeshGatewayConfig{Mode: "remote"},
+ },
+ },
+ UpstreamDefaults: UpstreamConfig{
+ ClusterJSON: "zip",
+ ListenerJSON: "zop",
+ Protocol: "http",
+ ConnectTimeoutMs: 5000,
+ Limits: UpstreamLimits{
+ MaxConnections: 3,
+ MaxPendingRequests: 4,
+ MaxConcurrentRequests: 5,
+ },
+ PassiveHealthCheck: PassiveHealthCheck{
+ MaxFailures: 5,
+ Interval: 4 * time.Second,
+ },
+ },
+ },
},
},
{
diff --git a/command/config/write/config_write_test.go b/command/config/write/config_write_test.go
index 814ccc3c0e..bc8a1ff799 100644
--- a/command/config/write/config_write_test.go
+++ b/command/config/write/config_write_test.go
@@ -423,7 +423,7 @@ func TestParseConfigEntry(t *testing.T) {
},
},
{
- name: "service-defaults",
+ name: "service-defaults: kitchen sink",
snake: `
kind = "service-defaults"
name = "main"
@@ -436,6 +436,36 @@ func TestParseConfigEntry(t *testing.T) {
mesh_gateway {
mode = "remote"
}
+ connect {
+ upstream_configs {
+ "redis" {
+ passive_health_check {
+ max_failures = 3
+ interval = "2s"
+ }
+ }
+ "finance/billing" {
+ mesh_gateway {
+ mode = "remote"
+ }
+ }
+ }
+ upstream_defaults {
+ cluster_json = "zip"
+ listener_json = "zop"
+ connect_timeout_ms = 5000
+ protocol = "http"
+ limits {
+ max_connections = 3
+ max_pending_requests = 4
+ max_concurrent_requests = 5
+ }
+ passive_health_check {
+ max_failures = 5
+ interval = "4s"
+ }
+ }
+ }
`,
camel: `
Kind = "service-defaults"
@@ -449,6 +479,36 @@ func TestParseConfigEntry(t *testing.T) {
MeshGateway {
Mode = "remote"
}
+ connect = {
+ upstream_configs = {
+ "redis" = {
+ passive_health_check = {
+ max_failures = 3
+ interval = "2s"
+ }
+ }
+ "finance/billing" = {
+ mesh_gateway = {
+ mode = "remote"
+ }
+ }
+ }
+ upstream_defaults = {
+ cluster_json = "zip"
+ listener_json = "zop"
+ connect_timeout_ms = 5000
+ protocol = "http"
+ limits = {
+ max_connections = 3
+ max_pending_requests = 4
+ max_concurrent_requests = 5
+ }
+ passive_health_check = {
+ max_failures = 5
+ interval = "4s"
+ }
+ }
+ }
`,
snakeJSON: `
{
@@ -462,6 +522,36 @@ func TestParseConfigEntry(t *testing.T) {
"external_sni": "abc-123",
"mesh_gateway": {
"mode": "remote"
+ },
+ "connect": {
+ "upstream_configs": {
+ "redis": {
+ "passive_health_check": {
+ "max_failures": 3,
+ "interval": "2s"
+ }
+ },
+ "finance/billing": {
+ "mesh_gateway": {
+ "mode": "remote"
+ }
+ }
+ },
+ "upstream_defaults": {
+ "cluster_json": "zip",
+ "listener_json": "zop",
+ "connect_timeout_ms": 5000,
+ "protocol": "http",
+ "limits": {
+ "max_connections": 3,
+ "max_pending_requests": 4,
+ "max_concurrent_requests": 5
+ },
+ "passive_health_check": {
+ "max_failures": 5,
+ "interval": "4s"
+ }
+ }
}
}
`,
@@ -477,6 +567,36 @@ func TestParseConfigEntry(t *testing.T) {
"ExternalSNI": "abc-123",
"MeshGateway": {
"Mode": "remote"
+ },
+ "Connect": {
+ "UpstreamConfigs": {
+ "redis": {
+ "PassiveHealthCheck": {
+ "MaxFailures": 3,
+ "Interval": "2s"
+ }
+ },
+ "finance/billing": {
+ "MeshGateway": {
+ "Mode": "remote"
+ }
+ }
+ },
+ "UpstreamDefaults": {
+ "ClusterJSON": "zip",
+ "ListenerJSON": "zop",
+ "ConnectTimeoutMs": 5000,
+ "Protocol": "http",
+ "Limits": {
+ "MaxConnections": 3,
+ "MaxPendingRequests": 4,
+ "MaxConcurrentRequests": 5
+ },
+ "PassiveHealthCheck": {
+ "MaxFailures": 5,
+ "Interval": "4s"
+ }
+ }
}
}
`,
@@ -492,6 +612,36 @@ func TestParseConfigEntry(t *testing.T) {
MeshGateway: api.MeshGatewayConfig{
Mode: api.MeshGatewayModeRemote,
},
+ Connect: api.ConnectConfiguration{
+ UpstreamConfigs: map[string]api.UpstreamConfig{
+ "redis": {
+ PassiveHealthCheck: api.PassiveHealthCheck{
+ MaxFailures: 3,
+ Interval: 2 * time.Second,
+ },
+ },
+ "finance/billing": {
+ MeshGateway: api.MeshGatewayConfig{
+ Mode: "remote",
+ },
+ },
+ },
+ UpstreamDefaults: api.UpstreamConfig{
+ ClusterJSON: "zip",
+ ListenerJSON: "zop",
+ Protocol: "http",
+ ConnectTimeoutMs: 5000,
+ Limits: api.UpstreamLimits{
+ MaxConnections: 3,
+ MaxPendingRequests: 4,
+ MaxConcurrentRequests: 5,
+ },
+ PassiveHealthCheck: api.PassiveHealthCheck{
+ MaxFailures: 5,
+ Interval: 4 * time.Second,
+ },
+ },
+ },
},
},
{