@ -10,6 +10,7 @@ import (
envoy_cluster_v3 "github.com/envoyproxy/go-control-plane/envoy/config/cluster/v3"
envoy_core_v3 "github.com/envoyproxy/go-control-plane/envoy/config/core/v3"
envoy_endpoint_v3 "github.com/envoyproxy/go-control-plane/envoy/config/endpoint/v3"
envoy_aggregate_cluster_v3 "github.com/envoyproxy/go-control-plane/envoy/extensions/clusters/aggregate/v3"
envoy_tls_v3 "github.com/envoyproxy/go-control-plane/envoy/extensions/transport_sockets/tls/v3"
envoy_upstreams_v3 "github.com/envoyproxy/go-control-plane/envoy/extensions/upstreams/http/v3"
envoy_matcher_v3 "github.com/envoyproxy/go-control-plane/envoy/type/matcher/v3"
@ -31,6 +32,7 @@ import (
const (
meshGatewayExportedClusterNamePrefix = "exported~"
failoverClusterNamePrefix = "failover-target~"
)
// clustersFromSnapshot returns the xDS API representation of the "clusters" in the snapshot.
@ -1008,180 +1010,174 @@ func (s *ResourceGenerator) makeUpstreamClustersForDiscoveryChain(
continue
}
failover := node . Resolver . Failover
targetID := node . Resolver . Target
target := chain . Targets [ targetID ]
// These variables are prefixed with primary to avoid shaddowing bugs.
primaryTargetID := node . Resolver . Target
primaryTarget := chain . Targets [ primaryTargetID ]
primaryClusterName := CustomizeClusterName ( primaryTarget . Name , chain )
if forMeshGateway {
primaryClusterName = meshGatewayExportedClusterNamePrefix + primaryClusterName
}
if forMeshGateway && ! cfgSnap . Locality . Matches ( target . Datacenter , target . Partition ) {
if forMeshGateway && ! cfgSnap . Locality . Matches ( primaryT arget. Datacenter , primaryT arget. Partition ) {
s . Logger . Warn ( "ignoring discovery chain target that crosses a datacenter or partition boundary in a mesh gateway" ,
"target" , target ,
"target" , primaryT arget,
"gatewayLocality" , cfgSnap . Locality ,
)
continue
}
// Determine if we have to generate the entire cluster differently.
failoverThroughMeshGateway := chain . WillFailoverThroughMeshGateway ( node ) && ! forMeshGateway
type targetClusterOptions struct {
targetID string
clusterName string
}
sni := target . SNI
clusterName := CustomizeClusterName ( target . Name , chain )
if forMeshGateway {
clusterName = meshGatewayExportedClusterNamePrefix + clusterName
}
// Get the SpiffeID for upstream SAN validation.
//
// For imported services the SpiffeID is embedded in the proxy instances.
// Whereas for local services we can construct the SpiffeID from the chain target.
var targetSpiffeID string
var additionalSpiffeIDs [ ] string
if uid . Peer != "" {
for _ , e := range chainEndpoints [ targetID ] {
targetSpiffeID = e . Service . Connect . PeerMeta . SpiffeID [ 0 ]
additionalSpiffeIDs = e . Service . Connect . PeerMeta . SpiffeID [ 1 : ]
// Only grab the first instance because it is the same for all instances.
break
// Construct the information required to make target clusters. When
// failover is configured, create the aggregate cluster.
var targetClustersOptions [ ] targetClusterOptions
if failover != nil && ! forMeshGateway {
var failoverClusterNames [ ] string
for _ , tid := range append ( [ ] string { primaryTargetID } , failover . Targets ... ) {
target := chain . Targets [ tid ]
clusterName := CustomizeClusterName ( target . Name , chain )
clusterName = failoverClusterNamePrefix + clusterName
targetClustersOptions = append ( targetClustersOptions , targetClusterOptions {
targetID : tid ,
clusterName : clusterName ,
} )
failoverClusterNames = append ( failoverClusterNames , clusterName )
}
} else {
targetSpiffeID = connect . SpiffeIDService {
Host : cfgSnap . Roots . TrustDomain ,
Namespace : target . Namespace ,
Partition : target . Partition ,
Datacenter : target . Datacenter ,
Service : target . Service ,
} . URI ( ) . String ( )
}
if failoverThroughMeshGateway {
actualTargetID := firstHealthyTarget (
chain . Targets ,
chainEndpoints ,
targetID ,
failover . Targets ,
)
aggregateClusterConfig , err := anypb . New ( & envoy_aggregate_cluster_v3 . ClusterConfig {
Clusters : failoverClusterNames ,
} )
if err != nil {
return nil , fmt . Errorf ( "failed to construct the aggregate cluster %q: %v" , primaryClusterName , err )
}
if actualTargetID != targetID {
actualTarget := chain . Targets [ actualTargetID ]
sni = actualTarget . SNI
c := & envoy_cluster_v3 . Cluster {
Name : primaryClusterName ,
AltStatName : primaryClusterName ,
ConnectTimeout : durationpb . New ( node . Resolver . ConnectTimeout ) ,
LbPolicy : envoy_cluster_v3 . Cluster_CLUSTER_PROVIDED ,
ClusterDiscoveryType : & envoy_cluster_v3 . Cluster_ClusterType {
ClusterType : & envoy_cluster_v3 . Cluster_CustomClusterType {
Name : "envoy.clusters.aggregate" ,
TypedConfig : aggregateClusterConfig ,
} ,
} ,
}
}
spiffeIDs := append ( [ ] string { targetSpiffeID } , additionalSpiffeIDs ... )
seenIDs := map [ string ] struct { } {
targetSpiffeID : { } ,
out = append ( out , c )
} else {
targetClustersOptions = append ( targetClustersOptions , targetClusterOptions {
targetID : primaryTargetID ,
clusterName : primaryClusterName ,
} )
}
if failover != nil {
// When failovers are present we need to add them as valid SANs to validate against.
// Envoy makes the failover decision independently based on the endpoint health it has available.
for _ , tid := range failover . Targets {
target , ok := chain . Targets [ tid ]
if ! ok {
continue
}
id := connect . SpiffeIDService {
Host : cfgSnap . Roots . TrustDomain ,
Namespace : target . Namespace ,
Partition : target . Partition ,
Datacenter : target . Datacenter ,
Service : target . Service ,
} . URI ( ) . String ( )
// Failover targets might be subsets of the same service, so these are deduplicated.
if _ , ok := seenIDs [ id ] ; ok {
continue
}
seenIDs [ id ] = struct { } { }
// Construct the target clusters.
for _ , targetInfo := range targetClustersOptions {
target := chain . Targets [ targetInfo . targetID ]
sni := target . SNI
var additionalSpiffeIDs [ ] string
spiffeIDs = append ( spiffeIDs , id )
targetSpiffeID := connect . SpiffeIDService {
Host : cfgSnap . Roots . TrustDomain ,
Namespace : target . Namespace ,
Partition : target . Partition ,
Datacenter : target . Datacenter ,
Service : target . Service ,
} . URI ( ) . String ( )
if uid . Peer != "" {
return nil , fmt . Errorf ( "impossible to get a peer discovery chain" )
}
}
sort . Strings ( spiffeIDs )
s . Logger . Trace ( "generating cluster for" , "cluster" , clusterName )
c := & envoy_cluster_v3 . Cluster {
Name : clusterName ,
AltStatName : clusterName ,
ConnectTimeout : durationpb . New ( node . Resolver . ConnectTimeout ) ,
ClusterDiscoveryType : & envoy_cluster_v3 . Cluster_Type { Type : envoy_cluster_v3 . Cluster_EDS } ,
CommonLbConfig : & envoy_cluster_v3 . Cluster_CommonLbConfig {
HealthyPanicThreshold : & envoy_type_v3 . Percent {
Value : 0 , // disable panic threshold
s . Logger . Trace ( "generating cluster for" , "cluster" , targetInfo . clusterName )
c := & envoy_cluster_v3 . Cluster {
Name : targetInfo . clusterName ,
AltStatName : targetInfo . clusterName ,
ConnectTimeout : durationpb . New ( node . Resolver . ConnectTimeout ) ,
ClusterDiscoveryType : & envoy_cluster_v3 . Cluster_Type { Type : envoy_cluster_v3 . Cluster_EDS } ,
CommonLbConfig : & envoy_cluster_v3 . Cluster_CommonLbConfig {
HealthyPanicThreshold : & envoy_type_v3 . Percent {
Value : 0 , // disable panic threshold
} ,
} ,
} ,
EdsCluster Config : & envoy_cluster _v3 . Cluster_EdsClusterC onfig {
EdsConfig : & envoy_core_v3 . ConfigSource {
ResourceApiVersion : envoy_core_v3 . ApiVersion_V3 ,
ConfigSourceSpecifier : & envoy_core_v3 . ConfigSource_Ads {
Ads : & envoy_core_v3 . AggregatedConfigSource { } ,
EdsClusterConfig : & envoy_cluster_v3 . Cluster_EdsClusterConfig {
EdsConfig : & envoy_core _v3 . ConfigSource {
ResourceApiVersion : envoy_core_v3 . ApiVersion_V3 ,
ConfigSourceSpecifier : & envoy_core_v3 . ConfigSource_Ads {
Ads : & envoy_core_v3 . Aggregated ConfigSource{ } ,
} ,
} ,
} ,
} ,
// TODO(peering): make circuit breakers or outlier detection work?
CircuitBreakers : & envoy_cluster_v3 . CircuitBreakers {
Thresholds : makeThresholdsIfNeeded ( cfg . Limits ) ,
} ,
OutlierDetection : ToOutlierDetection ( cfg . PassiveHealthCheck ) ,
}
// TODO(peering): make circuit breakers or outlier detection work?
CircuitBreakers : & envoy_cluster_v3 . CircuitBreakers {
Thresholds : makeThresholdsIfNeeded ( cfg . Limits ) ,
} ,
OutlierDetection : ToOutlierDetection ( cfg . PassiveHealthCheck ) ,
}
var lb * structs . LoadBalancer
if node . LoadBalancer != nil {
lb = node . LoadBalancer
}
if err := injectLBToCluster ( lb , c ) ; err != nil {
return nil , fmt . Errorf ( "failed to apply load balancer configuration to cluster %q: %v" , clusterName , err )
}
var lb * structs . LoadBalancer
if node . LoadBalancer != nil {
lb = node . LoadBalancer
}
if err := injectLBToCluster ( lb , c ) ; err != nil {
return nil , fmt . Errorf ( "failed to apply load balancer configuration to cluster %q: %v" , targetInfo . clusterName , err )
}
var proto string
if ! forMeshGateway {
proto = cfg . Protocol
}
if proto == "" {
proto = chain . Protocol
}
var proto string
if ! forMeshGateway {
proto = cfg . Protocol
}
if proto == "" {
proto = chain . Protocol
}
if proto == "" {
proto = "tcp"
}
if proto == "" {
proto = "tcp"
}
if proto == "http2" || proto == "grpc" {
if err := s . setHttp2ProtocolOptions ( c ) ; err != nil {
return nil , err
if proto == "http2" || proto == "grpc" {
if err := s . setHttp2ProtocolOptions ( c ) ; err != nil {
return nil , err
}
}
}
configureTLS := true
if forMeshGateway {
// We only initiate TLS if we're doing an L7 proxy.
configureTLS = structs . IsProtocolHTTPLike ( proto )
}
configureTLS := true
if forMeshGateway {
// We only initiate TLS if we're doing an L7 proxy.
configureTLS = structs . IsProtocolHTTPLike ( proto )
}
if configureTLS {
commonTLSContext := makeCommonTLSContext (
cfgSnap . Leaf ( ) ,
cfgSnap . RootPEMs ( ) ,
makeTLSParametersFromProxyTLSConfig ( cfgSnap . MeshConfigTLSOutgoing ( ) ) ,
)
if configureTLS {
commonTLSContext := makeCommonTLSContext (
cfgSnap . Leaf ( ) ,
cfgSnap . RootPEMs ( ) ,
makeTLSParametersFromProxyTLSConfig ( cfgSnap . MeshConfigTLSOutgoing ( ) ) ,
)
err = injectSANMatcher ( commonTLSContext , spiffeIDs ... )
if err != nil {
return nil , fmt . Errorf ( "failed to inject SAN matcher rules for cluster %q: %v" , sni , err )
}
spiffeIDs := append ( [ ] string { targetSpiffeID } , additionalSpiffeIDs ... )
sort . Strings ( spiffeIDs )
err = injectSANMatcher ( commonTLSContext , spiffeIDs ... )
if err != nil {
return nil , fmt . Errorf ( "failed to inject SAN matcher rules for cluster %q: %v" , sni , err )
}
tlsContext := & envoy_tls_v3 . UpstreamTlsContext {
CommonTlsContext : commonTLSContext ,
Sni : sni ,
}
transportSocket , err := makeUpstreamTLSTransportSocket ( tlsContext )
if err != nil {
return nil , err
tlsContext := & envoy_tls_v3 . UpstreamTlsContext {
CommonTlsContext : commonTLSContext ,
Sni : sni ,
}
transportSocket , err := makeUpstreamTLSTransportSocket ( tlsContext )
if err != nil {
return nil , err
}
c . TransportSocket = transportSocket
}
c . TransportSocket = transportSocket
}
out = append ( out , c )
out = append ( out , c )
}
}
if escapeHatchCluster != nil {