Create new types for service-defaults upstream cfg

pull/9872/head
freddygv 2021-03-08 22:10:27 -07:00
parent 35daee45bc
commit 87cde19b4c
10 changed files with 752 additions and 122 deletions

View File

@ -418,8 +418,8 @@ func TestServiceManager_PersistService_API(t *testing.T) {
"foo": 1, "foo": 1,
"protocol": "http", "protocol": "http",
}, },
UpstreamIDConfigs: structs.UpstreamConfigs{ UpstreamIDConfigs: structs.OpaqueUpstreamConfigs{
structs.UpstreamConfig{ structs.OpaqueUpstreamConfig{
Upstream: structs.NewServiceID("redis", nil), Upstream: structs.NewServiceID("redis", nil),
Config: map[string]interface{}{ Config: map[string]interface{}{
"protocol": "tcp", "protocol": "tcp",
@ -459,8 +459,8 @@ func TestServiceManager_PersistService_API(t *testing.T) {
"foo": 1, "foo": 1,
"protocol": "http", "protocol": "http",
}, },
UpstreamIDConfigs: structs.UpstreamConfigs{ UpstreamIDConfigs: structs.OpaqueUpstreamConfigs{
structs.UpstreamConfig{ structs.OpaqueUpstreamConfig{
Upstream: structs.NewServiceID("redis", nil), Upstream: structs.NewServiceID("redis", nil),
Config: map[string]interface{}{ Config: map[string]interface{}{
"protocol": "tcp", "protocol": "tcp",
@ -634,8 +634,8 @@ func TestServiceManager_PersistService_ConfigFiles(t *testing.T) {
"foo": 1, "foo": 1,
"protocol": "http", "protocol": "http",
}, },
UpstreamIDConfigs: structs.UpstreamConfigs{ UpstreamIDConfigs: structs.OpaqueUpstreamConfigs{
structs.UpstreamConfig{ structs.OpaqueUpstreamConfig{
Upstream: structs.NewServiceID("redis", nil), Upstream: structs.NewServiceID("redis", nil),
Config: map[string]interface{}{ Config: map[string]interface{}{
"protocol": "tcp", "protocol": "tcp",

View File

@ -87,11 +87,7 @@ type ServiceConfigEntry struct {
ExternalSNI string `json:",omitempty" alias:"external_sni"` ExternalSNI string `json:",omitempty" alias:"external_sni"`
// TODO(banks): enable this once we have upstreams supported too. Enabling Connect *ConnectConfiguration `json:",omitempty"`
// sidecars actually makes no sense and adds complications when you don't
// allow upstreams to be specified centrally too.
//
// Connect ConnectConfiguration
Meta map[string]string `json:",omitempty"` Meta map[string]string `json:",omitempty"`
EnterpriseMeta `hcl:",squash" mapstructure:",squash"` EnterpriseMeta `hcl:",squash" mapstructure:",squash"`
@ -131,13 +127,20 @@ func (e *ServiceConfigEntry) Normalize() error {
e.Kind = ServiceDefaults e.Kind = ServiceDefaults
e.Protocol = strings.ToLower(e.Protocol) e.Protocol = strings.ToLower(e.Protocol)
e.Connect.Normalize()
e.EnterpriseMeta.Normalize() e.EnterpriseMeta.Normalize()
return nil return nil
} }
func (e *ServiceConfigEntry) Validate() error { func (e *ServiceConfigEntry) Validate() error {
return validateConfigEntryMeta(e.Meta) validationErr := validateConfigEntryMeta(e.Meta)
if err := e.Connect.Validate(); err != nil {
validationErr = multierror.Append(validationErr, err)
}
return validationErr
} }
func (e *ServiceConfigEntry) CanRead(authz acl.Authorizer) bool { func (e *ServiceConfigEntry) CanRead(authz acl.Authorizer) bool {
@ -169,7 +172,38 @@ func (e *ServiceConfigEntry) GetEnterpriseMeta() *EnterpriseMeta {
} }
type ConnectConfiguration struct { type ConnectConfiguration struct {
SidecarProxy bool // UpstreamConfigs is a map of <namespace/>service to per-upstream configuration
UpstreamConfigs map[string]*UpstreamConfig `json:",omitempty" alias:"upstream_configs"`
// UpstreamDefaults contains default configuration for all upstreams of a given service
UpstreamDefaults *UpstreamConfig `json:",omitempty" alias:"upstream_defaults"`
}
func (cfg *ConnectConfiguration) Normalize() {
if cfg == nil {
return
}
for _, v := range cfg.UpstreamConfigs {
v.Normalize()
}
cfg.UpstreamDefaults.Normalize()
}
func (cfg ConnectConfiguration) Validate() error {
var validationErr error
for k, v := range cfg.UpstreamConfigs {
if err := v.Validate(); err != nil {
validationErr = multierror.Append(validationErr, fmt.Errorf("error in upstream config for %s: %v", k, err))
}
}
if err := cfg.UpstreamDefaults.Validate(); err != nil {
validationErr = multierror.Append(validationErr, fmt.Errorf("error in upstream defaults %v", err))
}
return validationErr
} }
// ProxyConfigEntry is the top-level struct for global proxy configuration defaults. // ProxyConfigEntry is the top-level struct for global proxy configuration defaults.
@ -592,13 +626,125 @@ func (r *ServiceConfigRequest) CacheInfo() cache.RequestInfo {
} }
type UpstreamConfig struct { type UpstreamConfig struct {
// ListenerJSON is a complete override ("escape hatch") for the upstream's
// listener.
//
// Note: This escape hatch is NOT compatible with the discovery chain and
// will be ignored if a discovery chain is active.
ListenerJSON string `json:",omitempty" alias:"listener_json"`
// ClusterJSON is a complete override ("escape hatch") for the upstream's
// cluster. The Connect client TLS certificate and context will be injected
// overriding any TLS settings present.
//
// Note: This escape hatch is NOT compatible with the discovery chain and
// will be ignored if a discovery chain is active.
ClusterJSON string `alias:"cluster_json"`
// Protocol describes the upstream's service protocol. Valid values are "tcp",
// "http" and "grpc". Anything else is treated as tcp. The enables protocol
// aware features like per-request metrics and connection pooling, tracing,
// routing etc.
Protocol string
// ConnectTimeoutMs is the number of milliseconds to timeout making a new
// connection to this upstream. Defaults to 5000 (5 seconds) if not set.
ConnectTimeoutMs int `alias:"connect_timeout_ms"`
// Limits are the set of limits that are applied to the proxy for a specific upstream of a
// service instance.
Limits UpstreamLimits
// PassiveHealthCheck configuration determines how upstream proxy instances will
// be monitored for removal from the load balancing pool.
PassiveHealthCheck PassiveHealthCheck `json:",omitempty" alias:"passive_health_check"`
// MeshGatewayConfig controls how Mesh Gateways are configured and used
MeshGateway MeshGatewayConfig `json:",omitempty" alias:"mesh_gateway" `
}
func (cfg *UpstreamConfig) Normalize() {
if cfg.Protocol == "" {
cfg.Protocol = "tcp"
} else {
cfg.Protocol = strings.ToLower(cfg.Protocol)
}
if cfg.ConnectTimeoutMs < 1 {
cfg.ConnectTimeoutMs = 5000
}
}
func (cfg UpstreamConfig) Validate() error {
var validationErr error
if err := cfg.PassiveHealthCheck.Validate(); err != nil {
validationErr = multierror.Append(validationErr, err)
}
if err := cfg.Limits.Validate(); err != nil {
validationErr = multierror.Append(validationErr, err)
}
return validationErr
}
type PassiveHealthCheck struct {
// Interval between health check analysis sweeps. Each sweep may remove
// hosts or return hosts to the pool.
Interval time.Duration
// MaxFailures is the count of consecutive failures that results in a host
// being removed from the pool.
MaxFailures uint32 `alias:"max_failures"`
}
func (chk PassiveHealthCheck) Validate() error {
if chk.Interval <= 0*time.Second {
return fmt.Errorf("passive health check interval must be greater than 0s")
}
return nil
}
// UpstreamLimits describes the limits that are associated with a specific
// upstream of a service instance.
type UpstreamLimits struct {
// MaxConnections is the maximum number of connections the local proxy can
// make to the upstream service.
MaxConnections *int `alias:"max_connections"`
// MaxPendingRequests is the maximum number of requests that will be queued
// waiting for an available connection. This is mostly applicable to HTTP/1.1
// clusters since all HTTP/2 requests are streamed over a single
// connection.
MaxPendingRequests *int `alias:"max_pending_requests"`
// MaxConcurrentRequests is the maximum number of in-flight requests that will be allowed
// to the upstream cluster at a point in time. This is mostly applicable to HTTP/2
// clusters since all HTTP/1.1 requests are limited by MaxConnections.
MaxConcurrentRequests *int `alias:"max_concurrent_requests"`
}
func (ul UpstreamLimits) Validate() error {
if ul.MaxConnections != nil && *ul.MaxConnections <= 0 {
return fmt.Errorf("max connections must be at least 0")
}
if ul.MaxPendingRequests != nil && *ul.MaxPendingRequests <= 0 {
return fmt.Errorf("max pending requests must be at least 0")
}
if ul.MaxConcurrentRequests != nil && *ul.MaxConcurrentRequests <= 0 {
return fmt.Errorf("max concurrent requests must be at least 0")
}
return nil
}
type OpaqueUpstreamConfig struct {
Upstream ServiceID Upstream ServiceID
Config map[string]interface{} Config map[string]interface{}
} }
type UpstreamConfigs []UpstreamConfig type OpaqueUpstreamConfigs []OpaqueUpstreamConfig
func (configs UpstreamConfigs) GetUpstreamConfig(sid ServiceID) (config map[string]interface{}, found bool) { func (configs OpaqueUpstreamConfigs) GetUpstreamConfig(sid ServiceID) (config map[string]interface{}, found bool) {
for _, usconf := range configs { for _, usconf := range configs {
if usconf.Upstream.Matches(sid) { if usconf.Upstream.Matches(sid) {
return usconf.Config, true return usconf.Config, true
@ -611,7 +757,7 @@ func (configs UpstreamConfigs) GetUpstreamConfig(sid ServiceID) (config map[stri
type ServiceConfigResponse struct { type ServiceConfigResponse struct {
ProxyConfig map[string]interface{} ProxyConfig map[string]interface{}
UpstreamConfigs map[string]map[string]interface{} UpstreamConfigs map[string]map[string]interface{}
UpstreamIDConfigs UpstreamConfigs UpstreamIDConfigs OpaqueUpstreamConfigs
MeshGateway MeshGatewayConfig `json:",omitempty"` MeshGateway MeshGatewayConfig `json:",omitempty"`
Expose ExposeConfig `json:",omitempty"` Expose ExposeConfig `json:",omitempty"`
QueryMeta QueryMeta

View File

@ -112,6 +112,33 @@ func TestDecodeConfigEntry(t *testing.T) {
mesh_gateway { mesh_gateway {
mode = "remote" mode = "remote"
} }
connect {
upstream_configs {
redis {
passive_health_check {
interval = "2s"
max_failures = 3
}
}
"finance/billing" {
mesh_gateway {
mode = "remote"
}
}
}
upstream_defaults {
connect_timeout_ms = 5
protocol = "http"
listener_json = "foo"
cluster_json = "bar"
limits {
max_connections = 3
max_pending_requests = 4
max_concurrent_requests = 5
}
}
}
`, `,
camel: ` camel: `
Kind = "service-defaults" Kind = "service-defaults"
@ -125,6 +152,33 @@ func TestDecodeConfigEntry(t *testing.T) {
MeshGateway { MeshGateway {
Mode = "remote" Mode = "remote"
} }
Connect {
UpstreamConfigs {
"redis" {
PassiveHealthCheck {
MaxFailures = 3
Interval = "2s"
}
}
"finance/billing" {
MeshGateway {
Mode = "remote"
}
}
}
UpstreamDefaults {
ListenerJSON = "foo"
ClusterJSON = "bar"
ConnectTimeoutMs = 5
Protocol = "http"
Limits {
MaxConnections = 3
MaxPendingRequests = 4
MaxConcurrentRequests = 5
}
}
}
`, `,
expect: &ServiceConfigEntry{ expect: &ServiceConfigEntry{
Kind: "service-defaults", Kind: "service-defaults",
@ -138,6 +192,30 @@ func TestDecodeConfigEntry(t *testing.T) {
MeshGateway: MeshGatewayConfig{ MeshGateway: MeshGatewayConfig{
Mode: MeshGatewayModeRemote, Mode: MeshGatewayModeRemote,
}, },
Connect: &ConnectConfiguration{
UpstreamConfigs: map[string]*UpstreamConfig{
"redis": {
PassiveHealthCheck: PassiveHealthCheck{
MaxFailures: 3,
Interval: 2 * time.Second,
},
},
"finance/billing": {
MeshGateway: MeshGatewayConfig{Mode: MeshGatewayModeRemote},
},
},
UpstreamDefaults: &UpstreamConfig{
ListenerJSON: "foo",
ClusterJSON: "bar",
ConnectTimeoutMs: 5,
Protocol: "http",
Limits: UpstreamLimits{
MaxConnections: intPointer(3),
MaxPendingRequests: intPointer(4),
MaxConcurrentRequests: intPointer(5),
},
},
},
}, },
}, },
{ {
@ -1330,7 +1408,199 @@ func TestConfigEntryResponseMarshalling(t *testing.T) {
} }
} }
func TestPassiveHealthCheck_Validate(t *testing.T) {
tt := []struct {
name string
input PassiveHealthCheck
wantErr bool
wantMsg string
}{
{
name: "valid-interval",
input: PassiveHealthCheck{Interval: 2 * time.Second},
wantErr: false,
},
{
name: "negative-interval",
input: PassiveHealthCheck{Interval: -1 * time.Second},
wantErr: true,
wantMsg: "greater than 0s",
},
{
name: "zero-interval",
input: PassiveHealthCheck{Interval: 0 * time.Second},
wantErr: true,
wantMsg: "greater than 0s",
},
}
for _, tc := range tt {
t.Run(tc.name, func(t *testing.T) {
err := tc.input.Validate()
if err == nil {
require.False(t, tc.wantErr)
return
}
require.Contains(t, err.Error(), tc.wantMsg)
})
}
}
func TestUpstreamLimits_Validate(t *testing.T) {
tt := []struct {
name string
input UpstreamLimits
wantErr bool
wantMsg string
}{
{
name: "valid-max-conns",
input: UpstreamLimits{MaxConnections: intPointer(1)},
wantErr: false,
},
{
name: "zero-max-conns",
input: UpstreamLimits{MaxConnections: intPointer(0)},
wantErr: true,
wantMsg: "at least 0",
},
{
name: "negative-max-conns",
input: UpstreamLimits{MaxConnections: intPointer(-1)},
wantErr: true,
wantMsg: "at least 0",
},
{
name: "valid-max-concurrent",
input: UpstreamLimits{MaxConcurrentRequests: intPointer(1)},
wantErr: false,
},
{
name: "zero-max-concurrent",
input: UpstreamLimits{MaxConcurrentRequests: intPointer(0)},
wantErr: true,
wantMsg: "at least 0",
},
{
name: "negative-max-concurrent",
input: UpstreamLimits{MaxConcurrentRequests: intPointer(-1)},
wantErr: true,
wantMsg: "at least 0",
},
{
name: "valid-max-pending",
input: UpstreamLimits{MaxPendingRequests: intPointer(1)},
wantErr: false,
},
{
name: "zero-max-pending",
input: UpstreamLimits{MaxPendingRequests: intPointer(0)},
wantErr: true,
wantMsg: "at least 0",
},
{
name: "negative-max-pending",
input: UpstreamLimits{MaxPendingRequests: intPointer(-1)},
wantErr: true,
wantMsg: "at least 0",
},
}
for _, tc := range tt {
t.Run(tc.name, func(t *testing.T) {
err := tc.input.Validate()
if err == nil {
require.False(t, tc.wantErr)
return
}
require.Contains(t, err.Error(), tc.wantMsg)
})
}
}
func TestServiceConfigEntry_Normalize(t *testing.T) {
tt := []struct {
name string
input ServiceConfigEntry
expect ServiceConfigEntry
}{
{
name: "fill-in-kind",
input: ServiceConfigEntry{
Name: "web",
},
expect: ServiceConfigEntry{
Kind: ServiceDefaults,
Name: "web",
},
},
{
name: "lowercase-protocol",
input: ServiceConfigEntry{
Kind: ServiceDefaults,
Name: "web",
Protocol: "PrOtoCoL",
},
expect: ServiceConfigEntry{
Kind: ServiceDefaults,
Name: "web",
Protocol: "protocol",
},
},
{
name: "connect-kitchen-sink",
input: ServiceConfigEntry{
Kind: ServiceDefaults,
Name: "web",
Connect: &ConnectConfiguration{
UpstreamConfigs: map[string]*UpstreamConfig{
"redis": {
Protocol: "TcP",
},
"memcached": {
ConnectTimeoutMs: -1,
},
},
UpstreamDefaults: &UpstreamConfig{ConnectTimeoutMs: -20},
},
},
expect: ServiceConfigEntry{
Kind: ServiceDefaults,
Name: "web",
Connect: &ConnectConfiguration{
UpstreamConfigs: map[string]*UpstreamConfig{
"redis": {
Protocol: "tcp",
ConnectTimeoutMs: 5000,
},
"memcached": {
Protocol: "tcp",
ConnectTimeoutMs: 5000,
},
},
UpstreamDefaults: &UpstreamConfig{
Protocol: "tcp",
ConnectTimeoutMs: 5000,
},
},
},
},
}
for _, tc := range tt {
t.Run(tc.name, func(t *testing.T) {
err := tc.input.Normalize()
require.NoError(t, err)
require.Equal(t, tc.expect, tc.input)
})
}
}
func requireContainsLower(t *testing.T, haystack, needle string) { func requireContainsLower(t *testing.T, haystack, needle string) {
t.Helper() t.Helper()
require.Contains(t, strings.ToLower(haystack), strings.ToLower(needle)) require.Contains(t, strings.ToLower(haystack), strings.ToLower(needle))
} }
func intPointer(i int) *int {
return &i
}

View File

@ -416,7 +416,7 @@ func (s *Server) makeUpstreamClusterForPreparedQuery(upstream structs.Upstream,
CircuitBreakers: &envoy_cluster_v3.CircuitBreakers{ CircuitBreakers: &envoy_cluster_v3.CircuitBreakers{
Thresholds: makeThresholdsIfNeeded(cfg.Limits), Thresholds: makeThresholdsIfNeeded(cfg.Limits),
}, },
OutlierDetection: cfg.PassiveHealthCheck.AsOutlierDetection(), OutlierDetection: ToOutlierDetection(cfg.PassiveHealthCheck),
} }
if cfg.Protocol == "http2" || cfg.Protocol == "grpc" { if cfg.Protocol == "http2" || cfg.Protocol == "grpc" {
c.Http2ProtocolOptions = &envoy_core_v3.Http2ProtocolOptions{} c.Http2ProtocolOptions = &envoy_core_v3.Http2ProtocolOptions{}
@ -524,7 +524,7 @@ func (s *Server) makeUpstreamClustersForDiscoveryChain(
CircuitBreakers: &envoy_cluster_v3.CircuitBreakers{ CircuitBreakers: &envoy_cluster_v3.CircuitBreakers{
Thresholds: makeThresholdsIfNeeded(cfg.Limits), Thresholds: makeThresholdsIfNeeded(cfg.Limits),
}, },
OutlierDetection: cfg.PassiveHealthCheck.AsOutlierDetection(), OutlierDetection: ToOutlierDetection(cfg.PassiveHealthCheck),
} }
var lb *structs.LoadBalancer var lb *structs.LoadBalancer
@ -734,8 +734,8 @@ func (s *Server) makeGatewayCluster(snap *proxycfg.ConfigSnapshot, opts gatewayC
return cluster return cluster
} }
func makeThresholdsIfNeeded(limits UpstreamLimits) []*envoy_cluster_v3.CircuitBreakers_Thresholds { func makeThresholdsIfNeeded(limits structs.UpstreamLimits) []*envoy_cluster_v3.CircuitBreakers_Thresholds {
var empty UpstreamLimits var empty structs.UpstreamLimits
// Make sure to not create any thresholds when passed the zero-value in order // Make sure to not create any thresholds when passed the zero-value in order
// to rely on Envoy defaults // to rely on Envoy defaults
if limits == empty { if limits == empty {
@ -743,6 +743,7 @@ func makeThresholdsIfNeeded(limits UpstreamLimits) []*envoy_cluster_v3.CircuitBr
} }
threshold := &envoy_cluster_v3.CircuitBreakers_Thresholds{} threshold := &envoy_cluster_v3.CircuitBreakers_Thresholds{}
// Likewise, make sure to not set any threshold values on the zero-value in // Likewise, make sure to not set any threshold values on the zero-value in
// order to rely on Envoy defaults // order to rely on Envoy defaults
if limits.MaxConnections != nil { if limits.MaxConnections != nil {

View File

@ -1,10 +1,8 @@
package xds package xds
import ( import (
"strings"
"time"
envoy_cluster_v3 "github.com/envoyproxy/go-control-plane/envoy/config/cluster/v3" envoy_cluster_v3 "github.com/envoyproxy/go-control-plane/envoy/config/cluster/v3"
"strings"
"github.com/golang/protobuf/ptypes" "github.com/golang/protobuf/ptypes"
"github.com/golang/protobuf/ptypes/wrappers" "github.com/golang/protobuf/ptypes/wrappers"
@ -150,74 +148,10 @@ func ParseGatewayConfig(m map[string]interface{}) (GatewayConfig, error) {
return cfg, err return cfg, err
} }
// UpstreamLimits describes the limits that are associated with a specific
// upstream of a service instance.
type UpstreamLimits struct {
// MaxConnections is the maximum number of connections the local proxy can
// make to the upstream service.
MaxConnections *int `mapstructure:"max_connections"`
// MaxPendingRequests is the maximum number of requests that will be queued
// waiting for an available connection. This is mostly applicable to HTTP/1.1
// clusters since all HTTP/2 requests are streamed over a single
// connection.
MaxPendingRequests *int `mapstructure:"max_pending_requests"`
// MaxConcurrentRequests is the maximum number of in-flight requests that will be allowed
// to the upstream cluster at a point in time. This is mostly applicable to HTTP/2
// clusters since all HTTP/1.1 requests are limited by MaxConnections.
MaxConcurrentRequests *int `mapstructure:"max_concurrent_requests"`
}
// UpstreamConfig describes the keys we understand from
// Connect.Proxy.Upstream[*].Config.
type UpstreamConfig struct {
// ListenerJSON is a complete override ("escape hatch") for the upstream's
// listener.
//
// Note: This escape hatch is NOT compatible with the discovery chain and
// will be ignored if a discovery chain is active.
ListenerJSON string `mapstructure:"envoy_listener_json"`
// ClusterJSON is a complete override ("escape hatch") for the upstream's
// cluster. The Connect client TLS certificate and context will be injected
// overriding any TLS settings present.
//
// Note: This escape hatch is NOT compatible with the discovery chain and
// will be ignored if a discovery chain is active.
ClusterJSON string `mapstructure:"envoy_cluster_json"`
// Protocol describes the upstream's service protocol. Valid values are "tcp",
// "http" and "grpc". Anything else is treated as tcp. The enables protocol
// aware features like per-request metrics and connection pooling, tracing,
// routing etc.
Protocol string `mapstructure:"protocol"`
// ConnectTimeoutMs is the number of milliseconds to timeout making a new
// connection to this upstream. Defaults to 5000 (5 seconds) if not set.
ConnectTimeoutMs int `mapstructure:"connect_timeout_ms"`
// Limits are the set of limits that are applied to the proxy for a specific upstream of a
// service instance.
Limits UpstreamLimits `mapstructure:"limits"`
// PassiveHealthCheck configuration
PassiveHealthCheck PassiveHealthCheck `mapstructure:"passive_health_check"`
}
type PassiveHealthCheck struct {
// Interval between health check analysis sweeps. Each sweep may remove
// hosts or return hosts to the pool.
Interval time.Duration
// MaxFailures is the count of consecutive failures that results in a host
// being removed from the pool.
MaxFailures uint32 `mapstructure:"max_failures"`
}
// Return an envoy.OutlierDetection populated by the values from this struct. // Return an envoy.OutlierDetection populated by the values from this struct.
// If all values are zero a default empty OutlierDetection will be returned to // If all values are zero a default empty OutlierDetection will be returned to
// enable outlier detection with default values. // enable outlier detection with default values.
func (p PassiveHealthCheck) AsOutlierDetection() *envoy_cluster_v3.OutlierDetection { func ToOutlierDetection(p structs.PassiveHealthCheck) *envoy_cluster_v3.OutlierDetection {
od := &envoy_cluster_v3.OutlierDetection{} od := &envoy_cluster_v3.OutlierDetection{}
if p.Interval != 0 { if p.Interval != 0 {
od.Interval = ptypes.DurationProto(p.Interval) od.Interval = ptypes.DurationProto(p.Interval)
@ -228,8 +162,8 @@ func (p PassiveHealthCheck) AsOutlierDetection() *envoy_cluster_v3.OutlierDetect
return od return od
} }
func ParseUpstreamConfigNoDefaults(m map[string]interface{}) (UpstreamConfig, error) { func ParseUpstreamConfigNoDefaults(m map[string]interface{}) (structs.UpstreamConfig, error) {
var cfg UpstreamConfig var cfg structs.UpstreamConfig
config := &mapstructure.DecoderConfig{ config := &mapstructure.DecoderConfig{
DecodeHook: mapstructure.ComposeDecodeHookFunc( DecodeHook: mapstructure.ComposeDecodeHookFunc(
decode.HookWeakDecodeFromSlice, decode.HookWeakDecodeFromSlice,
@ -252,16 +186,11 @@ func ParseUpstreamConfigNoDefaults(m map[string]interface{}) (UpstreamConfig, er
// ParseUpstreamConfig returns the UpstreamConfig parsed from an opaque map. // ParseUpstreamConfig returns the UpstreamConfig parsed from an opaque map.
// If an error occurs during parsing it is returned along with the default // If an error occurs during parsing it is returned along with the default
// config this allows caller to choose whether and how to report the error. // config this allows caller to choose whether and how to report the error.
func ParseUpstreamConfig(m map[string]interface{}) (UpstreamConfig, error) { func ParseUpstreamConfig(m map[string]interface{}) (structs.UpstreamConfig, error) {
cfg, err := ParseUpstreamConfigNoDefaults(m) cfg, err := ParseUpstreamConfigNoDefaults(m)
// Set defaults (even if error is returned) // Set defaults (even if error is returned)
if cfg.Protocol == "" { cfg.Normalize()
cfg.Protocol = "tcp"
} else {
cfg.Protocol = strings.ToLower(cfg.Protocol)
}
if cfg.ConnectTimeoutMs < 1 {
cfg.ConnectTimeoutMs = 5000
}
return cfg, err return cfg, err
} }

View File

@ -172,12 +172,12 @@ func TestParseUpstreamConfig(t *testing.T) {
tests := []struct { tests := []struct {
name string name string
input map[string]interface{} input map[string]interface{}
want UpstreamConfig want structs.UpstreamConfig
}{ }{
{ {
name: "defaults - nil", name: "defaults - nil",
input: nil, input: nil,
want: UpstreamConfig{ want: structs.UpstreamConfig{
ConnectTimeoutMs: 5000, ConnectTimeoutMs: 5000,
Protocol: "tcp", Protocol: "tcp",
}, },
@ -185,7 +185,7 @@ func TestParseUpstreamConfig(t *testing.T) {
{ {
name: "defaults - empty", name: "defaults - empty",
input: map[string]interface{}{}, input: map[string]interface{}{},
want: UpstreamConfig{ want: structs.UpstreamConfig{
ConnectTimeoutMs: 5000, ConnectTimeoutMs: 5000,
Protocol: "tcp", Protocol: "tcp",
}, },
@ -196,7 +196,7 @@ func TestParseUpstreamConfig(t *testing.T) {
"foo": "bar", "foo": "bar",
"envoy_foo": "envoy_bar", "envoy_foo": "envoy_bar",
}, },
want: UpstreamConfig{ want: structs.UpstreamConfig{
ConnectTimeoutMs: 5000, ConnectTimeoutMs: 5000,
Protocol: "tcp", Protocol: "tcp",
}, },
@ -206,7 +206,7 @@ func TestParseUpstreamConfig(t *testing.T) {
input: map[string]interface{}{ input: map[string]interface{}{
"protocol": "http", "protocol": "http",
}, },
want: UpstreamConfig{ want: structs.UpstreamConfig{
ConnectTimeoutMs: 5000, ConnectTimeoutMs: 5000,
Protocol: "http", Protocol: "http",
}, },
@ -216,7 +216,7 @@ func TestParseUpstreamConfig(t *testing.T) {
input: map[string]interface{}{ input: map[string]interface{}{
"connect_timeout_ms": "1000", "connect_timeout_ms": "1000",
}, },
want: UpstreamConfig{ want: structs.UpstreamConfig{
ConnectTimeoutMs: 1000, ConnectTimeoutMs: 1000,
Protocol: "tcp", Protocol: "tcp",
}, },
@ -226,7 +226,7 @@ func TestParseUpstreamConfig(t *testing.T) {
input: map[string]interface{}{ input: map[string]interface{}{
"connect_timeout_ms": float64(1000.0), "connect_timeout_ms": float64(1000.0),
}, },
want: UpstreamConfig{ want: structs.UpstreamConfig{
ConnectTimeoutMs: 1000, ConnectTimeoutMs: 1000,
Protocol: "tcp", Protocol: "tcp",
}, },
@ -236,7 +236,7 @@ func TestParseUpstreamConfig(t *testing.T) {
input: map[string]interface{}{ input: map[string]interface{}{
"connect_timeout_ms": 1000, "connect_timeout_ms": 1000,
}, },
want: UpstreamConfig{ want: structs.UpstreamConfig{
ConnectTimeoutMs: 1000, ConnectTimeoutMs: 1000,
Protocol: "tcp", Protocol: "tcp",
}, },
@ -250,10 +250,10 @@ func TestParseUpstreamConfig(t *testing.T) {
"max_concurrent_requests": 70, "max_concurrent_requests": 70,
}, },
}, },
want: UpstreamConfig{ want: structs.UpstreamConfig{
ConnectTimeoutMs: 5000, ConnectTimeoutMs: 5000,
Protocol: "tcp", Protocol: "tcp",
Limits: UpstreamLimits{ Limits: structs.UpstreamLimits{
MaxConnections: intPointer(50), MaxConnections: intPointer(50),
MaxPendingRequests: intPointer(60), MaxPendingRequests: intPointer(60),
MaxConcurrentRequests: intPointer(70), MaxConcurrentRequests: intPointer(70),
@ -269,10 +269,10 @@ func TestParseUpstreamConfig(t *testing.T) {
"max_concurrent_requests": 0, "max_concurrent_requests": 0,
}, },
}, },
want: UpstreamConfig{ want: structs.UpstreamConfig{
ConnectTimeoutMs: 5000, ConnectTimeoutMs: 5000,
Protocol: "tcp", Protocol: "tcp",
Limits: UpstreamLimits{ Limits: structs.UpstreamLimits{
MaxConnections: intPointer(0), MaxConnections: intPointer(0),
MaxPendingRequests: intPointer(0), MaxPendingRequests: intPointer(0),
MaxConcurrentRequests: intPointer(0), MaxConcurrentRequests: intPointer(0),
@ -287,10 +287,10 @@ func TestParseUpstreamConfig(t *testing.T) {
"max_failures": 7, "max_failures": 7,
}, },
}, },
want: UpstreamConfig{ want: structs.UpstreamConfig{
ConnectTimeoutMs: 5000, ConnectTimeoutMs: 5000,
Protocol: "tcp", Protocol: "tcp",
PassiveHealthCheck: PassiveHealthCheck{ PassiveHealthCheck: structs.PassiveHealthCheck{
Interval: 22 * time.Second, Interval: 22 * time.Second,
MaxFailures: 7, MaxFailures: 7,
}, },

View File

@ -1071,9 +1071,9 @@ func (s *Server) makeUpstreamListenerForDiscoveryChain(
return l, nil return l, nil
} }
func getAndModifyUpstreamConfigForListener(logger hclog.Logger, u *structs.Upstream, chain *structs.CompiledDiscoveryChain) UpstreamConfig { func getAndModifyUpstreamConfigForListener(logger hclog.Logger, u *structs.Upstream, chain *structs.CompiledDiscoveryChain) structs.UpstreamConfig {
var ( var (
cfg UpstreamConfig cfg structs.UpstreamConfig
err error err error
) )

View File

@ -91,12 +91,88 @@ type ExposePath struct {
ParsedFromCheck bool ParsedFromCheck bool
} }
type ConnectConfiguration struct {
// UpstreamConfigs is a map of <namespace/>service to per-upstream configuration
UpstreamConfigs map[string]UpstreamConfig `json:",omitempty" alias:"upstream_configs"`
// UpstreamDefaults contains default configuration for all upstreams of a given service
UpstreamDefaults UpstreamConfig `json:",omitempty" alias:"upstream_defaults"`
}
type UpstreamConfig struct {
// ListenerJSON is a complete override ("escape hatch") for the upstream's
// listener.
//
// Note: This escape hatch is NOT compatible with the discovery chain and
// will be ignored if a discovery chain is active.
ListenerJSON string `json:",omitempty" alias:"listener_json"`
// ClusterJSON is a complete override ("escape hatch") for the upstream's
// cluster. The Connect client TLS certificate and context will be injected
// overriding any TLS settings present.
//
// Note: This escape hatch is NOT compatible with the discovery chain and
// will be ignored if a discovery chain is active.
ClusterJSON string `alias:"cluster_json"`
// Protocol describes the upstream's service protocol. Valid values are "tcp",
// "http" and "grpc". Anything else is treated as tcp. The enables protocol
// aware features like per-request metrics and connection pooling, tracing,
// routing etc.
Protocol string
// ConnectTimeoutMs is the number of milliseconds to timeout making a new
// connection to this upstream. Defaults to 5000 (5 seconds) if not set.
ConnectTimeoutMs int `alias:"connect_timeout_ms"`
// Limits are the set of limits that are applied to the proxy for a specific upstream of a
// service instance.
Limits UpstreamLimits
// PassiveHealthCheck configuration determines how upstream proxy instances will
// be monitored for removal from the load balancing pool.
PassiveHealthCheck PassiveHealthCheck `json:",omitempty" alias:"passive_health_check"`
// MeshGatewayConfig controls how Mesh Gateways are configured and used
MeshGateway MeshGatewayConfig `json:",omitempty" alias:"mesh_gateway" `
}
type PassiveHealthCheck struct {
// Interval between health check analysis sweeps. Each sweep may remove
// hosts or return hosts to the pool.
Interval time.Duration
// MaxFailures is the count of consecutive failures that results in a host
// being removed from the pool.
MaxFailures uint32 `alias:"max_failures"`
}
// UpstreamLimits describes the limits that are associated with a specific
// upstream of a service instance.
type UpstreamLimits struct {
// MaxConnections is the maximum number of connections the local proxy can
// make to the upstream service.
MaxConnections int `alias:"max_connections"`
// MaxPendingRequests is the maximum number of requests that will be queued
// waiting for an available connection. This is mostly applicable to HTTP/1.1
// clusters since all HTTP/2 requests are streamed over a single
// connection.
MaxPendingRequests int `alias:"max_pending_requests"`
// MaxConcurrentRequests is the maximum number of in-flight requests that will be allowed
// to the upstream cluster at a point in time. This is mostly applicable to HTTP/2
// clusters since all HTTP/1.1 requests are limited by MaxConnections.
MaxConcurrentRequests int `alias:"max_concurrent_requests"`
}
type ServiceConfigEntry struct { type ServiceConfigEntry struct {
Kind string Kind string
Name string Name string
Namespace string `json:",omitempty"` Namespace string `json:",omitempty"`
Protocol string `json:",omitempty"` Protocol string `json:",omitempty"`
MeshGateway MeshGatewayConfig `json:",omitempty" alias:"mesh_gateway"` MeshGateway MeshGatewayConfig `json:",omitempty" alias:"mesh_gateway"`
Connect ConnectConfiguration `json:",omitempty"`
Expose ExposeConfig `json:",omitempty"` Expose ExposeConfig `json:",omitempty"`
ExternalSNI string `json:",omitempty" alias:"external_sni"` ExternalSNI string `json:",omitempty" alias:"external_sni"`
Meta map[string]string `json:",omitempty"` Meta map[string]string `json:",omitempty"`

View File

@ -332,6 +332,36 @@ func TestDecodeConfigEntry(t *testing.T) {
"ExternalSNI": "abc-123", "ExternalSNI": "abc-123",
"MeshGateway": { "MeshGateway": {
"Mode": "remote" "Mode": "remote"
},
"Connect": {
"UpstreamConfigs": {
"redis": {
"PassiveHealthCheck": {
"MaxFailures": 3,
"Interval": "2s"
}
},
"finance/billing": {
"MeshGateway": {
"Mode": "remote"
}
}
},
"UpstreamDefaults": {
"ClusterJSON": "zip",
"ListenerJSON": "zop",
"ConnectTimeoutMs": 5000,
"Protocol": "http",
"Limits": {
"MaxConnections": 3,
"MaxPendingRequests": 4,
"MaxConcurrentRequests": 5
},
"PassiveHealthCheck": {
"MaxFailures": 5,
"Interval": "4s"
}
}
} }
} }
`, `,
@ -347,6 +377,34 @@ func TestDecodeConfigEntry(t *testing.T) {
MeshGateway: MeshGatewayConfig{ MeshGateway: MeshGatewayConfig{
Mode: MeshGatewayModeRemote, Mode: MeshGatewayModeRemote,
}, },
Connect: ConnectConfiguration{
UpstreamConfigs: map[string]UpstreamConfig{
"redis": {
PassiveHealthCheck: PassiveHealthCheck{
MaxFailures: 3,
Interval: 2 * time.Second,
},
},
"finance/billing": {
MeshGateway: MeshGatewayConfig{Mode: "remote"},
},
},
UpstreamDefaults: UpstreamConfig{
ClusterJSON: "zip",
ListenerJSON: "zop",
Protocol: "http",
ConnectTimeoutMs: 5000,
Limits: UpstreamLimits{
MaxConnections: 3,
MaxPendingRequests: 4,
MaxConcurrentRequests: 5,
},
PassiveHealthCheck: PassiveHealthCheck{
MaxFailures: 5,
Interval: 4 * time.Second,
},
},
},
}, },
}, },
{ {

View File

@ -423,7 +423,7 @@ func TestParseConfigEntry(t *testing.T) {
}, },
}, },
{ {
name: "service-defaults", name: "service-defaults: kitchen sink",
snake: ` snake: `
kind = "service-defaults" kind = "service-defaults"
name = "main" name = "main"
@ -436,6 +436,36 @@ func TestParseConfigEntry(t *testing.T) {
mesh_gateway { mesh_gateway {
mode = "remote" mode = "remote"
} }
connect {
upstream_configs {
"redis" {
passive_health_check {
max_failures = 3
interval = "2s"
}
}
"finance/billing" {
mesh_gateway {
mode = "remote"
}
}
}
upstream_defaults {
cluster_json = "zip"
listener_json = "zop"
connect_timeout_ms = 5000
protocol = "http"
limits {
max_connections = 3
max_pending_requests = 4
max_concurrent_requests = 5
}
passive_health_check {
max_failures = 5
interval = "4s"
}
}
}
`, `,
camel: ` camel: `
Kind = "service-defaults" Kind = "service-defaults"
@ -449,6 +479,36 @@ func TestParseConfigEntry(t *testing.T) {
MeshGateway { MeshGateway {
Mode = "remote" Mode = "remote"
} }
connect = {
upstream_configs = {
"redis" = {
passive_health_check = {
max_failures = 3
interval = "2s"
}
}
"finance/billing" = {
mesh_gateway = {
mode = "remote"
}
}
}
upstream_defaults = {
cluster_json = "zip"
listener_json = "zop"
connect_timeout_ms = 5000
protocol = "http"
limits = {
max_connections = 3
max_pending_requests = 4
max_concurrent_requests = 5
}
passive_health_check = {
max_failures = 5
interval = "4s"
}
}
}
`, `,
snakeJSON: ` snakeJSON: `
{ {
@ -462,6 +522,36 @@ func TestParseConfigEntry(t *testing.T) {
"external_sni": "abc-123", "external_sni": "abc-123",
"mesh_gateway": { "mesh_gateway": {
"mode": "remote" "mode": "remote"
},
"connect": {
"upstream_configs": {
"redis": {
"passive_health_check": {
"max_failures": 3,
"interval": "2s"
}
},
"finance/billing": {
"mesh_gateway": {
"mode": "remote"
}
}
},
"upstream_defaults": {
"cluster_json": "zip",
"listener_json": "zop",
"connect_timeout_ms": 5000,
"protocol": "http",
"limits": {
"max_connections": 3,
"max_pending_requests": 4,
"max_concurrent_requests": 5
},
"passive_health_check": {
"max_failures": 5,
"interval": "4s"
}
}
} }
} }
`, `,
@ -477,6 +567,36 @@ func TestParseConfigEntry(t *testing.T) {
"ExternalSNI": "abc-123", "ExternalSNI": "abc-123",
"MeshGateway": { "MeshGateway": {
"Mode": "remote" "Mode": "remote"
},
"Connect": {
"UpstreamConfigs": {
"redis": {
"PassiveHealthCheck": {
"MaxFailures": 3,
"Interval": "2s"
}
},
"finance/billing": {
"MeshGateway": {
"Mode": "remote"
}
}
},
"UpstreamDefaults": {
"ClusterJSON": "zip",
"ListenerJSON": "zop",
"ConnectTimeoutMs": 5000,
"Protocol": "http",
"Limits": {
"MaxConnections": 3,
"MaxPendingRequests": 4,
"MaxConcurrentRequests": 5
},
"PassiveHealthCheck": {
"MaxFailures": 5,
"Interval": "4s"
}
}
} }
} }
`, `,
@ -492,6 +612,36 @@ func TestParseConfigEntry(t *testing.T) {
MeshGateway: api.MeshGatewayConfig{ MeshGateway: api.MeshGatewayConfig{
Mode: api.MeshGatewayModeRemote, Mode: api.MeshGatewayModeRemote,
}, },
Connect: api.ConnectConfiguration{
UpstreamConfigs: map[string]api.UpstreamConfig{
"redis": {
PassiveHealthCheck: api.PassiveHealthCheck{
MaxFailures: 3,
Interval: 2 * time.Second,
},
},
"finance/billing": {
MeshGateway: api.MeshGatewayConfig{
Mode: "remote",
},
},
},
UpstreamDefaults: api.UpstreamConfig{
ClusterJSON: "zip",
ListenerJSON: "zop",
Protocol: "http",
ConnectTimeoutMs: 5000,
Limits: api.UpstreamLimits{
MaxConnections: 3,
MaxPendingRequests: 4,
MaxConcurrentRequests: 5,
},
PassiveHealthCheck: api.PassiveHealthCheck{
MaxFailures: 5,
Interval: 4 * time.Second,
},
},
},
}, },
}, },
{ {