mirror of https://github.com/hashicorp/consul
You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
678 lines
22 KiB
678 lines
22 KiB
package xds |
|
|
|
import ( |
|
"errors" |
|
"fmt" |
|
|
|
envoy_cluster_v3 "github.com/envoyproxy/go-control-plane/envoy/config/cluster/v3" |
|
envoy_core_v3 "github.com/envoyproxy/go-control-plane/envoy/config/core/v3" |
|
envoy_endpoint_v3 "github.com/envoyproxy/go-control-plane/envoy/config/endpoint/v3" |
|
|
|
"github.com/golang/protobuf/proto" |
|
bexpr "github.com/hashicorp/go-bexpr" |
|
|
|
"github.com/hashicorp/consul/acl" |
|
"github.com/hashicorp/consul/agent/connect" |
|
"github.com/hashicorp/consul/agent/proxycfg" |
|
"github.com/hashicorp/consul/agent/structs" |
|
"github.com/hashicorp/consul/api" |
|
) |
|
|
|
const ( |
|
UnnamedSubset = "" |
|
) |
|
|
|
// endpointsFromSnapshot returns the xDS API representation of the "endpoints" |
|
func (s *ResourceGenerator) endpointsFromSnapshot(cfgSnap *proxycfg.ConfigSnapshot) ([]proto.Message, error) { |
|
if cfgSnap == nil { |
|
return nil, errors.New("nil config given") |
|
} |
|
|
|
switch cfgSnap.Kind { |
|
case structs.ServiceKindConnectProxy: |
|
return s.endpointsFromSnapshotConnectProxy(cfgSnap) |
|
case structs.ServiceKindTerminatingGateway: |
|
return s.endpointsFromSnapshotTerminatingGateway(cfgSnap) |
|
case structs.ServiceKindMeshGateway: |
|
return s.endpointsFromSnapshotMeshGateway(cfgSnap) |
|
case structs.ServiceKindIngressGateway: |
|
return s.endpointsFromSnapshotIngressGateway(cfgSnap) |
|
default: |
|
return nil, fmt.Errorf("Invalid service kind: %v", cfgSnap.Kind) |
|
} |
|
} |
|
|
|
// endpointsFromSnapshotConnectProxy returns the xDS API representation of the "endpoints" |
|
// (upstream instances) in the snapshot. |
|
func (s *ResourceGenerator) endpointsFromSnapshotConnectProxy(cfgSnap *proxycfg.ConfigSnapshot) ([]proto.Message, error) { |
|
// TODO: this estimate is wrong |
|
resources := make([]proto.Message, 0, |
|
len(cfgSnap.ConnectProxy.PreparedQueryEndpoints)+ |
|
len(cfgSnap.ConnectProxy.PeerUpstreamEndpoints)+ |
|
len(cfgSnap.ConnectProxy.WatchedUpstreamEndpoints)) |
|
|
|
// NOTE: Any time we skip a chain below we MUST also skip that discovery chain in clusters.go |
|
// so that the sets of endpoints generated matches the sets of clusters. |
|
for uid, chain := range cfgSnap.ConnectProxy.DiscoveryChain { |
|
upstreamCfg := cfgSnap.ConnectProxy.UpstreamConfig[uid] |
|
|
|
explicit := upstreamCfg.HasLocalPortOrSocket() |
|
if _, implicit := cfgSnap.ConnectProxy.IntentionUpstreams[uid]; !implicit && !explicit { |
|
// Discovery chain is not associated with a known explicit or implicit upstream so it is skipped. |
|
continue |
|
} |
|
|
|
es := s.endpointsFromDiscoveryChain( |
|
uid, |
|
chain, |
|
cfgSnap.Locality, |
|
upstreamCfg, |
|
cfgSnap.ConnectProxy.WatchedUpstreamEndpoints[uid], |
|
cfgSnap.ConnectProxy.WatchedGatewayEndpoints[uid], |
|
) |
|
resources = append(resources, es...) |
|
} |
|
|
|
// NOTE: Any time we skip an upstream below we MUST also skip that same |
|
// upstream in clusters.go so that the sets of endpoints generated matches |
|
// the sets of clusters. |
|
// |
|
// TODO(peering): make this work for tproxy |
|
for _, uid := range cfgSnap.ConnectProxy.PeeredUpstreamIDs() { |
|
upstreamCfg := cfgSnap.ConnectProxy.UpstreamConfig[uid] |
|
|
|
explicit := upstreamCfg.HasLocalPortOrSocket() |
|
if _, implicit := cfgSnap.ConnectProxy.IntentionUpstreams[uid]; !implicit && !explicit { |
|
// Not associated with a known explicit or implicit upstream so it is skipped. |
|
continue |
|
} |
|
|
|
peerMeta := cfgSnap.ConnectProxy.UpstreamPeerMeta(uid) |
|
|
|
// TODO(peering): if we replicated service metadata separately from the |
|
// instances we wouldn't have to flip/flop this cluster name like this. |
|
clusterName := peerMeta.PrimarySNI() |
|
if clusterName == "" { |
|
clusterName = uid.EnvoyID() |
|
} |
|
|
|
// Also skip peer instances with a hostname as their address. EDS |
|
// cannot resolve hostnames, so we provide them through CDS instead. |
|
if _, ok := cfgSnap.ConnectProxy.PeerUpstreamEndpointsUseHostnames[uid]; ok { |
|
continue |
|
} |
|
|
|
endpoints, ok := cfgSnap.ConnectProxy.PeerUpstreamEndpoints[uid] |
|
if ok { |
|
la := makeLoadAssignment( |
|
clusterName, |
|
[]loadAssignmentEndpointGroup{ |
|
{Endpoints: endpoints}, |
|
}, |
|
proxycfg.GatewayKey{ /*empty so it never matches*/ }, |
|
) |
|
resources = append(resources, la) |
|
} |
|
} |
|
|
|
// Looping over explicit upstreams is only needed for prepared queries because they do not have discovery chains |
|
for _, u := range cfgSnap.Proxy.Upstreams { |
|
if u.DestinationType != structs.UpstreamDestTypePreparedQuery { |
|
continue |
|
} |
|
uid := proxycfg.NewUpstreamID(&u) |
|
|
|
dc := u.Datacenter |
|
if dc == "" { |
|
dc = cfgSnap.Datacenter |
|
} |
|
clusterName := connect.UpstreamSNI(&u, "", dc, cfgSnap.Roots.TrustDomain) |
|
|
|
endpoints, ok := cfgSnap.ConnectProxy.PreparedQueryEndpoints[uid] |
|
if ok { |
|
la := makeLoadAssignment( |
|
clusterName, |
|
[]loadAssignmentEndpointGroup{ |
|
{Endpoints: endpoints}, |
|
}, |
|
cfgSnap.Locality, |
|
) |
|
resources = append(resources, la) |
|
} |
|
} |
|
|
|
return resources, nil |
|
} |
|
|
|
func (s *ResourceGenerator) filterSubsetEndpoints(subset *structs.ServiceResolverSubset, endpoints structs.CheckServiceNodes) (structs.CheckServiceNodes, error) { |
|
// locally execute the subsets filter |
|
if subset.Filter != "" { |
|
filter, err := bexpr.CreateFilter(subset.Filter, nil, endpoints) |
|
if err != nil { |
|
return nil, err |
|
} |
|
|
|
raw, err := filter.Execute(endpoints) |
|
if err != nil { |
|
return nil, err |
|
} |
|
return raw.(structs.CheckServiceNodes), nil |
|
} |
|
return endpoints, nil |
|
} |
|
|
|
func (s *ResourceGenerator) endpointsFromSnapshotTerminatingGateway(cfgSnap *proxycfg.ConfigSnapshot) ([]proto.Message, error) { |
|
return s.endpointsFromServicesAndResolvers(cfgSnap, cfgSnap.TerminatingGateway.ServiceGroups, cfgSnap.TerminatingGateway.ServiceResolvers) |
|
} |
|
|
|
func (s *ResourceGenerator) endpointsFromSnapshotMeshGateway(cfgSnap *proxycfg.ConfigSnapshot) ([]proto.Message, error) { |
|
keys := cfgSnap.MeshGateway.GatewayKeys() |
|
resources := make([]proto.Message, 0, len(keys)+len(cfgSnap.MeshGateway.ServiceGroups)) |
|
|
|
for _, key := range keys { |
|
if key.Matches(cfgSnap.Datacenter, cfgSnap.ProxyID.PartitionOrDefault()) { |
|
continue // skip local |
|
} |
|
// Also skip gateways with a hostname as their address. EDS cannot resolve hostnames, |
|
// so we provide them through CDS instead. |
|
if len(cfgSnap.MeshGateway.HostnameDatacenters[key.String()]) > 0 { |
|
continue |
|
} |
|
|
|
// Mesh gateways in remote DCs are discovered in two ways: |
|
// |
|
// 1. Via an Internal.ServiceDump RPC in the remote DC (GatewayGroups). |
|
// 2. In the federation state that is replicated from the primary DC (FedStateGateways). |
|
// |
|
// We determine which set to use based on whichever contains the highest |
|
// raft ModifyIndex (and is therefore most up-to-date). |
|
// |
|
// Previously, GatewayGroups was always given presedence over FedStateGateways |
|
// but this was problematic when using mesh gateways for WAN federation. |
|
// |
|
// Consider the following example: |
|
// |
|
// - Primary and Secondary DCs are WAN Federated via local mesh gateways. |
|
// |
|
// - Secondary DC's mesh gateway is running on an ephemeral compute instance |
|
// and is abruptly terminated and rescheduled with a *new IP address*. |
|
// |
|
// - Primary DC's mesh gateway is no longer able to connect to the Secondary |
|
// DC as its proxy is configured with the old IP address. Therefore any RPC |
|
// from the Primary to the Secondary DC will fail (including the one to |
|
// discover the gateway's new IP address). |
|
// |
|
// - Secondary DC performs its regular anti-entropy of federation state data |
|
// to the Primary DC (this succeeds as there is still connectivity in this |
|
// direction). |
|
// |
|
// - At this point the Primary DC's mesh gateway should observe the new IP |
|
// address and reconfigure its proxy, however as we always prioritised |
|
// GatewayGroups this didn't happen and the connection remained severed. |
|
maxModifyIndex := func(vals structs.CheckServiceNodes) uint64 { |
|
var max uint64 |
|
for _, v := range vals { |
|
if i := v.Service.RaftIndex.ModifyIndex; i > max { |
|
max = i |
|
} |
|
} |
|
return max |
|
} |
|
|
|
endpoints := cfgSnap.MeshGateway.GatewayGroups[key.String()] |
|
fedStateEndpoints := cfgSnap.MeshGateway.FedStateGateways[key.String()] |
|
|
|
if maxModifyIndex(fedStateEndpoints) > maxModifyIndex(endpoints) { |
|
endpoints = fedStateEndpoints |
|
} |
|
|
|
if len(endpoints) == 0 { |
|
s.Logger.Error("skipping mesh gateway endpoints because no definition found", "datacenter", key) |
|
continue |
|
} |
|
|
|
{ // standard connect |
|
clusterName := connect.GatewaySNI(key.Datacenter, key.Partition, cfgSnap.Roots.TrustDomain) |
|
|
|
la := makeLoadAssignment( |
|
clusterName, |
|
[]loadAssignmentEndpointGroup{ |
|
{Endpoints: endpoints}, |
|
}, |
|
cfgSnap.Locality, |
|
) |
|
resources = append(resources, la) |
|
} |
|
|
|
if cfgSnap.ProxyID.InDefaultPartition() && |
|
cfgSnap.ServiceMeta[structs.MetaWANFederationKey] == "1" && |
|
cfgSnap.ServerSNIFn != nil { |
|
|
|
clusterName := cfgSnap.ServerSNIFn(key.Datacenter, "") |
|
la := makeLoadAssignment( |
|
clusterName, |
|
[]loadAssignmentEndpointGroup{ |
|
{Endpoints: endpoints}, |
|
}, |
|
cfgSnap.Locality, |
|
) |
|
resources = append(resources, la) |
|
} |
|
} |
|
|
|
// generate endpoints for our servers if WAN federation is enabled |
|
if cfgSnap.ProxyID.InDefaultPartition() && |
|
cfgSnap.ServiceMeta[structs.MetaWANFederationKey] == "1" && |
|
cfgSnap.ServerSNIFn != nil { |
|
var allServersLbEndpoints []*envoy_endpoint_v3.LbEndpoint |
|
|
|
for _, srv := range cfgSnap.MeshGateway.ConsulServers { |
|
clusterName := cfgSnap.ServerSNIFn(cfgSnap.Datacenter, srv.Node.Node) |
|
|
|
_, addr, port := srv.BestAddress(false /*wan*/) |
|
|
|
lbEndpoint := &envoy_endpoint_v3.LbEndpoint{ |
|
HostIdentifier: &envoy_endpoint_v3.LbEndpoint_Endpoint{ |
|
Endpoint: &envoy_endpoint_v3.Endpoint{ |
|
Address: makeAddress(addr, port), |
|
}, |
|
}, |
|
HealthStatus: envoy_core_v3.HealthStatus_UNKNOWN, |
|
} |
|
|
|
cla := &envoy_endpoint_v3.ClusterLoadAssignment{ |
|
ClusterName: clusterName, |
|
Endpoints: []*envoy_endpoint_v3.LocalityLbEndpoints{{ |
|
LbEndpoints: []*envoy_endpoint_v3.LbEndpoint{lbEndpoint}, |
|
}}, |
|
} |
|
allServersLbEndpoints = append(allServersLbEndpoints, lbEndpoint) |
|
|
|
resources = append(resources, cla) |
|
} |
|
|
|
// And add one catch all so that remote datacenters can dial ANY server |
|
// in this datacenter without knowing its name. |
|
resources = append(resources, &envoy_endpoint_v3.ClusterLoadAssignment{ |
|
ClusterName: cfgSnap.ServerSNIFn(cfgSnap.Datacenter, ""), |
|
Endpoints: []*envoy_endpoint_v3.LocalityLbEndpoints{{ |
|
LbEndpoints: allServersLbEndpoints, |
|
}}, |
|
}) |
|
} |
|
|
|
// Generate the endpoints for each service and its subsets |
|
e, err := s.endpointsFromServicesAndResolvers(cfgSnap, cfgSnap.MeshGateway.ServiceGroups, cfgSnap.MeshGateway.ServiceResolvers) |
|
if err != nil { |
|
return nil, err |
|
} |
|
resources = append(resources, e...) |
|
|
|
return resources, nil |
|
} |
|
|
|
func (s *ResourceGenerator) endpointsFromServicesAndResolvers( |
|
cfgSnap *proxycfg.ConfigSnapshot, |
|
services map[structs.ServiceName]structs.CheckServiceNodes, |
|
resolvers map[structs.ServiceName]*structs.ServiceResolverConfigEntry, |
|
) ([]proto.Message, error) { |
|
resources := make([]proto.Message, 0, len(services)) |
|
|
|
// generate the endpoints for the linked service groups |
|
for svc, endpoints := range services { |
|
// Skip creating endpoints for services that have hostnames as addresses |
|
// EDS cannot resolve hostnames so we provide them through CDS instead |
|
if cfgSnap.Kind == structs.ServiceKindTerminatingGateway && len(cfgSnap.TerminatingGateway.HostnameServices[svc]) > 0 { |
|
continue |
|
} |
|
|
|
clusterEndpoints := make(map[string][]loadAssignmentEndpointGroup) |
|
clusterEndpoints[UnnamedSubset] = []loadAssignmentEndpointGroup{{Endpoints: endpoints, OnlyPassing: false}} |
|
|
|
// Collect all of the loadAssignmentEndpointGroups for the various subsets. We do this before generating |
|
// the endpoints for the default/unnamed subset so that we can take into account the DefaultSubset on the |
|
// service-resolver which may prevent the default/unnamed cluster from creating endpoints for all service |
|
// instances. |
|
if resolver, hasResolver := resolvers[svc]; hasResolver { |
|
for subsetName, subset := range resolver.Subsets { |
|
subsetEndpoints, err := s.filterSubsetEndpoints(&subset, endpoints) |
|
if err != nil { |
|
return nil, err |
|
} |
|
groups := []loadAssignmentEndpointGroup{{Endpoints: subsetEndpoints, OnlyPassing: subset.OnlyPassing}} |
|
clusterEndpoints[subsetName] = groups |
|
|
|
// if this subset is the default then override the unnamed subset with this configuration |
|
if subsetName == resolver.DefaultSubset { |
|
clusterEndpoints[UnnamedSubset] = groups |
|
} |
|
} |
|
} |
|
|
|
// now generate the load assignment for all subsets |
|
for subsetName, groups := range clusterEndpoints { |
|
clusterName := connect.ServiceSNI(svc.Name, subsetName, svc.NamespaceOrDefault(), svc.PartitionOrDefault(), cfgSnap.Datacenter, cfgSnap.Roots.TrustDomain) |
|
la := makeLoadAssignment( |
|
clusterName, |
|
groups, |
|
cfgSnap.Locality, |
|
) |
|
resources = append(resources, la) |
|
} |
|
} |
|
|
|
return resources, nil |
|
} |
|
|
|
func (s *ResourceGenerator) endpointsFromSnapshotIngressGateway(cfgSnap *proxycfg.ConfigSnapshot) ([]proto.Message, error) { |
|
var resources []proto.Message |
|
createdClusters := make(map[proxycfg.UpstreamID]bool) |
|
for _, upstreams := range cfgSnap.IngressGateway.Upstreams { |
|
for _, u := range upstreams { |
|
uid := proxycfg.NewUpstreamID(&u) |
|
|
|
// If we've already created endpoints for this upstream, skip it. Multiple listeners may |
|
// reference the same upstream, so we don't need to create duplicate endpoints in that case. |
|
if createdClusters[uid] { |
|
continue |
|
} |
|
|
|
es := s.endpointsFromDiscoveryChain( |
|
uid, |
|
cfgSnap.IngressGateway.DiscoveryChain[uid], |
|
proxycfg.GatewayKey{Datacenter: cfgSnap.Datacenter, Partition: u.DestinationPartition}, |
|
&u, |
|
cfgSnap.IngressGateway.WatchedUpstreamEndpoints[uid], |
|
cfgSnap.IngressGateway.WatchedGatewayEndpoints[uid], |
|
) |
|
resources = append(resources, es...) |
|
createdClusters[uid] = true |
|
} |
|
} |
|
return resources, nil |
|
} |
|
|
|
// used in clusters.go |
|
func makeEndpoint(host string, port int) *envoy_endpoint_v3.LbEndpoint { |
|
return &envoy_endpoint_v3.LbEndpoint{ |
|
HostIdentifier: &envoy_endpoint_v3.LbEndpoint_Endpoint{ |
|
Endpoint: &envoy_endpoint_v3.Endpoint{ |
|
Address: makeAddress(host, port), |
|
}, |
|
}, |
|
} |
|
} |
|
|
|
func makePipeEndpoint(path string) *envoy_endpoint_v3.LbEndpoint { |
|
return &envoy_endpoint_v3.LbEndpoint{ |
|
HostIdentifier: &envoy_endpoint_v3.LbEndpoint_Endpoint{ |
|
Endpoint: &envoy_endpoint_v3.Endpoint{ |
|
Address: makePipeAddress(path, 0), |
|
}, |
|
}, |
|
} |
|
} |
|
|
|
func (s *ResourceGenerator) endpointsFromDiscoveryChain( |
|
uid proxycfg.UpstreamID, |
|
chain *structs.CompiledDiscoveryChain, |
|
gatewayKey proxycfg.GatewayKey, |
|
upstream *structs.Upstream, |
|
upstreamEndpoints map[string]structs.CheckServiceNodes, |
|
gatewayEndpoints map[string]structs.CheckServiceNodes, |
|
) []proto.Message { |
|
var resources []proto.Message |
|
|
|
if chain == nil { |
|
return resources |
|
} |
|
|
|
configMap := make(map[string]interface{}) |
|
if upstream != nil { |
|
configMap = upstream.Config |
|
} |
|
cfg, err := structs.ParseUpstreamConfigNoDefaults(configMap) |
|
if err != nil { |
|
// Don't hard fail on a config typo, just warn. The parse func returns |
|
// default config if there is an error so it's safe to continue. |
|
s.Logger.Warn("failed to parse", "upstream", uid, |
|
"error", err) |
|
} |
|
|
|
var escapeHatchCluster *envoy_cluster_v3.Cluster |
|
if cfg.EnvoyClusterJSON != "" { |
|
if chain.Default { |
|
// If you haven't done anything to setup the discovery chain, then |
|
// you can use the envoy_cluster_json escape hatch. |
|
escapeHatchCluster, err = makeClusterFromUserConfig(cfg.EnvoyClusterJSON) |
|
if err != nil { |
|
return resources |
|
} |
|
} else { |
|
s.Logger.Warn("ignoring escape hatch setting, because a discovery chain is configued for", |
|
"discovery chain", chain.ServiceName, "upstream", uid, |
|
"envoy_cluster_json", chain.ServiceName) |
|
} |
|
} |
|
|
|
// Find all resolver nodes. |
|
for _, node := range chain.Nodes { |
|
if node.Type != structs.DiscoveryGraphNodeTypeResolver { |
|
continue |
|
} |
|
failover := node.Resolver.Failover |
|
targetID := node.Resolver.Target |
|
|
|
target := chain.Targets[targetID] |
|
|
|
clusterName := CustomizeClusterName(target.Name, chain) |
|
if escapeHatchCluster != nil { |
|
clusterName = escapeHatchCluster.Name |
|
} |
|
s.Logger.Debug("generating endpoints for", "cluster", clusterName) |
|
|
|
// Determine if we have to generate the entire cluster differently. |
|
failoverThroughMeshGateway := chain.WillFailoverThroughMeshGateway(node) |
|
|
|
if failoverThroughMeshGateway { |
|
actualTargetID := firstHealthyTarget( |
|
chain.Targets, |
|
upstreamEndpoints, |
|
targetID, |
|
failover.Targets, |
|
) |
|
if actualTargetID != targetID { |
|
targetID = actualTargetID |
|
} |
|
|
|
failover = nil |
|
} |
|
|
|
primaryGroup, valid := makeLoadAssignmentEndpointGroup( |
|
chain.Targets, |
|
upstreamEndpoints, |
|
gatewayEndpoints, |
|
targetID, |
|
gatewayKey, |
|
) |
|
if !valid { |
|
continue // skip the cluster if we're still populating the snapshot |
|
} |
|
|
|
var endpointGroups []loadAssignmentEndpointGroup |
|
|
|
if failover != nil && len(failover.Targets) > 0 { |
|
endpointGroups = make([]loadAssignmentEndpointGroup, 0, len(failover.Targets)+1) |
|
|
|
endpointGroups = append(endpointGroups, primaryGroup) |
|
|
|
for _, failTargetID := range failover.Targets { |
|
failoverGroup, valid := makeLoadAssignmentEndpointGroup( |
|
chain.Targets, |
|
upstreamEndpoints, |
|
gatewayEndpoints, |
|
failTargetID, |
|
gatewayKey, |
|
) |
|
if !valid { |
|
continue // skip the failover target if we're still populating the snapshot |
|
} |
|
endpointGroups = append(endpointGroups, failoverGroup) |
|
} |
|
} else { |
|
endpointGroups = append(endpointGroups, primaryGroup) |
|
} |
|
|
|
la := makeLoadAssignment( |
|
clusterName, |
|
endpointGroups, |
|
gatewayKey, |
|
) |
|
resources = append(resources, la) |
|
} |
|
|
|
return resources |
|
} |
|
|
|
type loadAssignmentEndpointGroup struct { |
|
Endpoints structs.CheckServiceNodes |
|
OnlyPassing bool |
|
OverrideHealth envoy_core_v3.HealthStatus |
|
} |
|
|
|
func makeLoadAssignment(clusterName string, endpointGroups []loadAssignmentEndpointGroup, localKey proxycfg.GatewayKey) *envoy_endpoint_v3.ClusterLoadAssignment { |
|
cla := &envoy_endpoint_v3.ClusterLoadAssignment{ |
|
ClusterName: clusterName, |
|
Endpoints: make([]*envoy_endpoint_v3.LocalityLbEndpoints, 0, len(endpointGroups)), |
|
} |
|
|
|
if len(endpointGroups) > 1 { |
|
cla.Policy = &envoy_endpoint_v3.ClusterLoadAssignment_Policy{ |
|
// We choose such a large value here that the failover math should |
|
// in effect not happen until zero instances are healthy. |
|
OverprovisioningFactor: makeUint32Value(100000), |
|
} |
|
} |
|
|
|
for priority, endpointGroup := range endpointGroups { |
|
endpoints := endpointGroup.Endpoints |
|
es := make([]*envoy_endpoint_v3.LbEndpoint, 0, len(endpoints)) |
|
|
|
for _, ep := range endpoints { |
|
// TODO (mesh-gateway) - should we respect the translate_wan_addrs configuration here or just always use the wan for cross-dc? |
|
_, addr, port := ep.BestAddress(!localKey.Matches(ep.Node.Datacenter, ep.Node.PartitionOrDefault())) |
|
healthStatus, weight := calculateEndpointHealthAndWeight(ep, endpointGroup.OnlyPassing) |
|
|
|
if endpointGroup.OverrideHealth != envoy_core_v3.HealthStatus_UNKNOWN { |
|
healthStatus = endpointGroup.OverrideHealth |
|
} |
|
|
|
es = append(es, &envoy_endpoint_v3.LbEndpoint{ |
|
HostIdentifier: &envoy_endpoint_v3.LbEndpoint_Endpoint{ |
|
Endpoint: &envoy_endpoint_v3.Endpoint{ |
|
Address: makeAddress(addr, port), |
|
}, |
|
}, |
|
HealthStatus: healthStatus, |
|
LoadBalancingWeight: makeUint32Value(weight), |
|
}) |
|
} |
|
|
|
cla.Endpoints = append(cla.Endpoints, &envoy_endpoint_v3.LocalityLbEndpoints{ |
|
Priority: uint32(priority), |
|
LbEndpoints: es, |
|
}) |
|
} |
|
|
|
return cla |
|
} |
|
|
|
func makeLoadAssignmentEndpointGroup( |
|
targets map[string]*structs.DiscoveryTarget, |
|
targetHealth map[string]structs.CheckServiceNodes, |
|
gatewayHealth map[string]structs.CheckServiceNodes, |
|
targetID string, |
|
localKey proxycfg.GatewayKey, |
|
) (loadAssignmentEndpointGroup, bool) { |
|
realEndpoints, ok := targetHealth[targetID] |
|
if !ok { |
|
// skip the cluster if we're still populating the snapshot |
|
return loadAssignmentEndpointGroup{}, false |
|
} |
|
target := targets[targetID] |
|
|
|
var gatewayKey proxycfg.GatewayKey |
|
|
|
switch target.MeshGateway.Mode { |
|
case structs.MeshGatewayModeRemote: |
|
gatewayKey.Datacenter = target.Datacenter |
|
gatewayKey.Partition = target.Partition |
|
|
|
case structs.MeshGatewayModeLocal: |
|
gatewayKey = localKey |
|
} |
|
|
|
if gatewayKey.IsEmpty() || (acl.EqualPartitions(localKey.Partition, target.Partition) && localKey.Datacenter == target.Datacenter) { |
|
// Gateways are not needed if the request isn't for a remote DC or partition. |
|
return loadAssignmentEndpointGroup{ |
|
Endpoints: realEndpoints, |
|
OnlyPassing: target.Subset.OnlyPassing, |
|
}, true |
|
} |
|
|
|
// If using a mesh gateway we need to pull those endpoints instead. |
|
gatewayEndpoints, ok := gatewayHealth[gatewayKey.String()] |
|
if !ok { |
|
// skip the cluster if we're still populating the snapshot |
|
return loadAssignmentEndpointGroup{}, false |
|
} |
|
|
|
// But we will use the health from the actual backend service. |
|
overallHealth := envoy_core_v3.HealthStatus_UNHEALTHY |
|
for _, ep := range realEndpoints { |
|
health, _ := calculateEndpointHealthAndWeight(ep, target.Subset.OnlyPassing) |
|
if health == envoy_core_v3.HealthStatus_HEALTHY { |
|
overallHealth = envoy_core_v3.HealthStatus_HEALTHY |
|
break |
|
} |
|
} |
|
|
|
return loadAssignmentEndpointGroup{ |
|
Endpoints: gatewayEndpoints, |
|
OverrideHealth: overallHealth, |
|
}, true |
|
} |
|
|
|
func calculateEndpointHealthAndWeight( |
|
ep structs.CheckServiceNode, |
|
onlyPassing bool, |
|
) (envoy_core_v3.HealthStatus, int) { |
|
healthStatus := envoy_core_v3.HealthStatus_HEALTHY |
|
weight := 1 |
|
if ep.Service.Weights != nil { |
|
weight = ep.Service.Weights.Passing |
|
} |
|
|
|
for _, chk := range ep.Checks { |
|
if chk.Status == api.HealthCritical { |
|
healthStatus = envoy_core_v3.HealthStatus_UNHEALTHY |
|
} |
|
if onlyPassing && chk.Status != api.HealthPassing { |
|
healthStatus = envoy_core_v3.HealthStatus_UNHEALTHY |
|
} |
|
if chk.Status == api.HealthWarning && ep.Service.Weights != nil { |
|
weight = ep.Service.Weights.Warning |
|
} |
|
} |
|
// Make weights fit Envoy's limits. A zero weight means that either Warning |
|
// (likely) or Passing (weirdly) weight has been set to 0 effectively making |
|
// this instance unhealthy and should not be sent traffic. |
|
if weight < 1 { |
|
healthStatus = envoy_core_v3.HealthStatus_UNHEALTHY |
|
weight = 1 |
|
} |
|
if weight > 128 { |
|
weight = 128 |
|
} |
|
return healthStatus, weight |
|
}
|
|
|