mirror of https://github.com/hashicorp/consul
Make locality aware routing xDS changes (#17826)
parent
500dcb1f21
commit
a3ba559149
|
@ -13,6 +13,10 @@ import (
|
|||
// DeepCopy generates a deep copy of *ConfigSnapshot
|
||||
func (o *ConfigSnapshot) DeepCopy() *ConfigSnapshot {
|
||||
var cp ConfigSnapshot = *o
|
||||
if o.ServiceLocality != nil {
|
||||
cp.ServiceLocality = new(structs.Locality)
|
||||
*cp.ServiceLocality = *o.ServiceLocality
|
||||
}
|
||||
if o.ServiceMeta != nil {
|
||||
cp.ServiceMeta = make(map[string]string, len(o.ServiceMeta))
|
||||
for k2, v2 := range o.ServiceMeta {
|
||||
|
|
|
@ -901,6 +901,7 @@ func IngressListenerKeyFromListener(l structs.IngressListener) IngressListenerKe
|
|||
type ConfigSnapshot struct {
|
||||
Kind structs.ServiceKind
|
||||
Service string
|
||||
ServiceLocality *structs.Locality
|
||||
ProxyID ProxyID
|
||||
Address string
|
||||
Port int
|
||||
|
|
|
@ -124,6 +124,7 @@ type serviceInstance struct {
|
|||
taggedAddresses map[string]structs.ServiceAddress
|
||||
proxyCfg structs.ConnectProxyConfig
|
||||
token string
|
||||
locality *structs.Locality
|
||||
}
|
||||
|
||||
func copyProxyConfig(ns *structs.NodeService) (structs.ConnectProxyConfig, error) {
|
||||
|
@ -244,6 +245,7 @@ func newServiceInstanceFromNodeService(id ProxyID, ns *structs.NodeService, toke
|
|||
return serviceInstance{
|
||||
kind: ns.Kind,
|
||||
service: ns.Service,
|
||||
locality: ns.Locality,
|
||||
proxyID: id,
|
||||
address: ns.Address,
|
||||
port: ns.Port,
|
||||
|
@ -303,6 +305,7 @@ func newConfigSnapshotFromServiceInstance(s serviceInstance, config stateConfig)
|
|||
return ConfigSnapshot{
|
||||
Kind: s.kind,
|
||||
Service: s.service,
|
||||
ServiceLocality: s.locality,
|
||||
ProxyID: s.proxyID,
|
||||
Address: s.address,
|
||||
Port: s.port,
|
||||
|
|
|
@ -2095,6 +2095,18 @@ func (csn *CheckServiceNode) CanRead(authz acl.Authorizer) acl.EnforcementDecisi
|
|||
return acl.Allow
|
||||
}
|
||||
|
||||
func (csn *CheckServiceNode) Locality() *Locality {
|
||||
if csn.Service != nil && csn.Service.Locality != nil {
|
||||
return csn.Service.Locality
|
||||
}
|
||||
|
||||
if csn.Node != nil && csn.Node.Locality != nil {
|
||||
return csn.Node.Locality
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
type CheckServiceNodes []CheckServiceNode
|
||||
|
||||
func (csns CheckServiceNodes) DeepCopy() CheckServiceNodes {
|
||||
|
|
|
@ -135,7 +135,9 @@ func (s *ResourceGenerator) endpointsFromSnapshotConnectProxy(cfgSnap *proxycfg.
|
|||
endpoints, ok := cfgSnap.ConnectProxy.PreparedQueryEndpoints[uid]
|
||||
if ok {
|
||||
la := makeLoadAssignment(
|
||||
cfgSnap,
|
||||
clusterName,
|
||||
nil,
|
||||
[]loadAssignmentEndpointGroup{
|
||||
{Endpoints: endpoints},
|
||||
},
|
||||
|
@ -158,7 +160,9 @@ func (s *ResourceGenerator) endpointsFromSnapshotConnectProxy(cfgSnap *proxycfg.
|
|||
endpoints, ok := cfgSnap.ConnectProxy.DestinationGateways.Get(uid)
|
||||
if ok {
|
||||
la := makeLoadAssignment(
|
||||
cfgSnap,
|
||||
name,
|
||||
nil,
|
||||
[]loadAssignmentEndpointGroup{
|
||||
{Endpoints: endpoints},
|
||||
},
|
||||
|
@ -224,7 +228,9 @@ func (s *ResourceGenerator) endpointsFromSnapshotMeshGateway(cfgSnap *proxycfg.C
|
|||
clusterName := connect.GatewaySNI(key.Datacenter, key.Partition, cfgSnap.Roots.TrustDomain)
|
||||
|
||||
la := makeLoadAssignment(
|
||||
cfgSnap,
|
||||
clusterName,
|
||||
nil,
|
||||
[]loadAssignmentEndpointGroup{
|
||||
{Endpoints: endpoints},
|
||||
},
|
||||
|
@ -239,7 +245,9 @@ func (s *ResourceGenerator) endpointsFromSnapshotMeshGateway(cfgSnap *proxycfg.C
|
|||
|
||||
clusterName := cfgSnap.ServerSNIFn(key.Datacenter, "")
|
||||
la := makeLoadAssignment(
|
||||
cfgSnap,
|
||||
clusterName,
|
||||
nil,
|
||||
[]loadAssignmentEndpointGroup{
|
||||
{Endpoints: endpoints},
|
||||
},
|
||||
|
@ -409,7 +417,9 @@ func (s *ResourceGenerator) endpointsFromServicesAndResolvers(
|
|||
for subsetName, groups := range clusterEndpoints {
|
||||
clusterName := connect.ServiceSNI(svc.Name, subsetName, svc.NamespaceOrDefault(), svc.PartitionOrDefault(), cfgSnap.Datacenter, cfgSnap.Roots.TrustDomain)
|
||||
la := makeLoadAssignment(
|
||||
cfgSnap,
|
||||
clusterName,
|
||||
nil,
|
||||
groups,
|
||||
cfgSnap.Locality,
|
||||
)
|
||||
|
@ -444,7 +454,9 @@ func (s *ResourceGenerator) makeEndpointsForOutgoingPeeredServices(
|
|||
groups := []loadAssignmentEndpointGroup{{Endpoints: serviceGroup.Nodes, OnlyPassing: false}}
|
||||
|
||||
la := makeLoadAssignment(
|
||||
cfgSnap,
|
||||
clusterName,
|
||||
nil,
|
||||
groups,
|
||||
// Use an empty key here so that it never matches. This will force the mesh gateway to always
|
||||
// reference the remote mesh gateway's wan addr.
|
||||
|
@ -606,7 +618,9 @@ func (s *ResourceGenerator) makeUpstreamLoadAssignmentForPeerService(
|
|||
return la, nil
|
||||
}
|
||||
la = makeLoadAssignment(
|
||||
cfgSnap,
|
||||
clusterName,
|
||||
nil,
|
||||
[]loadAssignmentEndpointGroup{
|
||||
{Endpoints: localGw},
|
||||
},
|
||||
|
@ -626,7 +640,9 @@ func (s *ResourceGenerator) makeUpstreamLoadAssignmentForPeerService(
|
|||
return nil, nil
|
||||
}
|
||||
la = makeLoadAssignment(
|
||||
cfgSnap,
|
||||
clusterName,
|
||||
nil,
|
||||
[]loadAssignmentEndpointGroup{
|
||||
{Endpoints: endpoints},
|
||||
},
|
||||
|
@ -756,7 +772,9 @@ func (s *ResourceGenerator) endpointsFromDiscoveryChain(
|
|||
}
|
||||
|
||||
la := makeLoadAssignment(
|
||||
cfgSnap,
|
||||
clusterName,
|
||||
ti.PrioritizeByLocality,
|
||||
[]loadAssignmentEndpointGroup{endpointGroup},
|
||||
gatewayKey,
|
||||
)
|
||||
|
@ -842,7 +860,7 @@ type loadAssignmentEndpointGroup struct {
|
|||
OverrideHealth envoy_core_v3.HealthStatus
|
||||
}
|
||||
|
||||
func makeLoadAssignment(clusterName string, endpointGroups []loadAssignmentEndpointGroup, localKey proxycfg.GatewayKey) *envoy_endpoint_v3.ClusterLoadAssignment {
|
||||
func makeLoadAssignment(cfgSnap *proxycfg.ConfigSnapshot, clusterName string, policy *structs.DiscoveryPrioritizeByLocality, endpointGroups []loadAssignmentEndpointGroup, localKey proxycfg.GatewayKey) *envoy_endpoint_v3.ClusterLoadAssignment {
|
||||
cla := &envoy_endpoint_v3.ClusterLoadAssignment{
|
||||
ClusterName: clusterName,
|
||||
Endpoints: make([]*envoy_endpoint_v3.LocalityLbEndpoints, 0, len(endpointGroups)),
|
||||
|
@ -856,35 +874,46 @@ func makeLoadAssignment(clusterName string, endpointGroups []loadAssignmentEndpo
|
|||
}
|
||||
}
|
||||
|
||||
for priority, endpointGroup := range endpointGroups {
|
||||
endpoints := endpointGroup.Endpoints
|
||||
es := make([]*envoy_endpoint_v3.LbEndpoint, 0, len(endpoints))
|
||||
var priority uint32
|
||||
|
||||
for _, ep := range endpoints {
|
||||
// TODO (mesh-gateway) - should we respect the translate_wan_addrs configuration here or just always use the wan for cross-dc?
|
||||
_, addr, port := ep.BestAddress(!localKey.Matches(ep.Node.Datacenter, ep.Node.PartitionOrDefault()))
|
||||
healthStatus, weight := calculateEndpointHealthAndWeight(ep, endpointGroup.OnlyPassing)
|
||||
for _, endpointGroup := range endpointGroups {
|
||||
endpointsByLocality, err := groupedEndpoints(cfgSnap.ServiceLocality, policy, endpointGroup.Endpoints)
|
||||
|
||||
if endpointGroup.OverrideHealth != envoy_core_v3.HealthStatus_UNKNOWN {
|
||||
healthStatus = endpointGroup.OverrideHealth
|
||||
}
|
||||
|
||||
endpoint := &envoy_endpoint_v3.Endpoint{
|
||||
Address: makeAddress(addr, port),
|
||||
}
|
||||
es = append(es, &envoy_endpoint_v3.LbEndpoint{
|
||||
HostIdentifier: &envoy_endpoint_v3.LbEndpoint_Endpoint{
|
||||
Endpoint: endpoint,
|
||||
},
|
||||
HealthStatus: healthStatus,
|
||||
LoadBalancingWeight: makeUint32Value(weight),
|
||||
})
|
||||
if err != nil {
|
||||
continue
|
||||
}
|
||||
|
||||
cla.Endpoints = append(cla.Endpoints, &envoy_endpoint_v3.LocalityLbEndpoints{
|
||||
Priority: uint32(priority),
|
||||
LbEndpoints: es,
|
||||
})
|
||||
for _, endpoints := range endpointsByLocality {
|
||||
es := make([]*envoy_endpoint_v3.LbEndpoint, 0, len(endpointGroup.Endpoints))
|
||||
|
||||
for _, ep := range endpoints {
|
||||
// TODO (mesh-gateway) - should we respect the translate_wan_addrs configuration here or just always use the wan for cross-dc?
|
||||
_, addr, port := ep.BestAddress(!localKey.Matches(ep.Node.Datacenter, ep.Node.PartitionOrDefault()))
|
||||
healthStatus, weight := calculateEndpointHealthAndWeight(ep, endpointGroup.OnlyPassing)
|
||||
|
||||
if endpointGroup.OverrideHealth != envoy_core_v3.HealthStatus_UNKNOWN {
|
||||
healthStatus = endpointGroup.OverrideHealth
|
||||
}
|
||||
|
||||
endpoint := &envoy_endpoint_v3.Endpoint{
|
||||
Address: makeAddress(addr, port),
|
||||
}
|
||||
es = append(es, &envoy_endpoint_v3.LbEndpoint{
|
||||
HostIdentifier: &envoy_endpoint_v3.LbEndpoint_Endpoint{
|
||||
Endpoint: endpoint,
|
||||
},
|
||||
HealthStatus: healthStatus,
|
||||
LoadBalancingWeight: makeUint32Value(weight),
|
||||
})
|
||||
}
|
||||
|
||||
cla.Endpoints = append(cla.Endpoints, &envoy_endpoint_v3.LocalityLbEndpoints{
|
||||
Priority: priority,
|
||||
LbEndpoints: es,
|
||||
})
|
||||
|
||||
priority++
|
||||
}
|
||||
}
|
||||
|
||||
return cla
|
||||
|
|
|
@ -101,6 +101,7 @@ func Test_makeLoadAssignment(t *testing.T) {
|
|||
tests := []struct {
|
||||
name string
|
||||
clusterName string
|
||||
locality *structs.Locality
|
||||
endpoints []loadAssignmentEndpointGroup
|
||||
want *envoy_endpoint_v3.ClusterLoadAssignment
|
||||
}{
|
||||
|
@ -211,11 +212,24 @@ func Test_makeLoadAssignment(t *testing.T) {
|
|||
for _, tt := range tests {
|
||||
t.Run(tt.name, func(t *testing.T) {
|
||||
got := makeLoadAssignment(
|
||||
&proxycfg.ConfigSnapshot{ServiceLocality: tt.locality},
|
||||
tt.clusterName,
|
||||
nil,
|
||||
tt.endpoints,
|
||||
proxycfg.GatewayKey{Datacenter: "dc1"},
|
||||
)
|
||||
require.Equal(t, tt.want, got)
|
||||
|
||||
if tt.locality == nil {
|
||||
got := makeLoadAssignment(
|
||||
&proxycfg.ConfigSnapshot{ServiceLocality: &structs.Locality{Region: "us-west-1", Zone: "us-west-1a"}},
|
||||
tt.clusterName,
|
||||
nil,
|
||||
tt.endpoints,
|
||||
proxycfg.GatewayKey{Datacenter: "dc1"},
|
||||
)
|
||||
require.Equal(t, tt.want, got)
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
|
|
@ -27,6 +27,8 @@ type targetInfo struct {
|
|||
// Region is the region from the failover target's Locality. nil means the
|
||||
// target is in the local Consul cluster.
|
||||
Region *string
|
||||
|
||||
PrioritizeByLocality *structs.DiscoveryPrioritizeByLocality
|
||||
}
|
||||
|
||||
type discoChainTargetGroup struct {
|
||||
|
@ -87,7 +89,7 @@ func (s *ResourceGenerator) mapDiscoChainTargets(cfgSnap *proxycfg.ConfigSnapsho
|
|||
var sni, rootPEMs string
|
||||
var spiffeIDs []string
|
||||
targetUID := proxycfg.NewUpstreamIDFromTargetID(tid)
|
||||
ti := targetInfo{TargetID: tid}
|
||||
ti := targetInfo{TargetID: tid, PrioritizeByLocality: target.PrioritizeByLocality}
|
||||
|
||||
configureTLS := true
|
||||
if forMeshGateway {
|
||||
|
|
|
@ -0,0 +1,21 @@
|
|||
// Copyright (c) HashiCorp, Inc.
|
||||
// SPDX-License-Identifier: MPL-2.0
|
||||
|
||||
package xds
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
|
||||
"github.com/hashicorp/consul/agent/structs"
|
||||
)
|
||||
|
||||
func groupedEndpoints(locality *structs.Locality, policy *structs.DiscoveryPrioritizeByLocality, csns structs.CheckServiceNodes) ([]structs.CheckServiceNodes, error) {
|
||||
switch {
|
||||
case policy == nil || policy.Mode == "" || policy.Mode == "none":
|
||||
return []structs.CheckServiceNodes{csns}, nil
|
||||
case policy.Mode == "failover":
|
||||
return prioritizeByLocalityFailover(locality, csns), nil
|
||||
default:
|
||||
return nil, fmt.Errorf("unexpected priortize-by-locality mode %q", policy.Mode)
|
||||
}
|
||||
}
|
|
@ -0,0 +1,15 @@
|
|||
// Copyright (c) HashiCorp, Inc.
|
||||
// SPDX-License-Identifier: MPL-2.0
|
||||
|
||||
//go:build !consulent
|
||||
// +build !consulent
|
||||
|
||||
package xds
|
||||
|
||||
import (
|
||||
"github.com/hashicorp/consul/agent/structs"
|
||||
)
|
||||
|
||||
func prioritizeByLocalityFailover(locality *structs.Locality, csns structs.CheckServiceNodes) []structs.CheckServiceNodes {
|
||||
return nil
|
||||
}
|
|
@ -40,6 +40,20 @@ func CatalogServiceExists(t *testing.T, c *api.Client, svc string, opts *api.Que
|
|||
})
|
||||
}
|
||||
|
||||
// CatalogServiceHasInstanceCount verifies the service name exists in the Consul catalog and has the specified
|
||||
// number of instances.
|
||||
func CatalogServiceHasInstanceCount(t *testing.T, c *api.Client, svc string, count int, opts *api.QueryOptions) {
|
||||
retry.Run(t, func(r *retry.R) {
|
||||
services, _, err := c.Catalog().Service(svc, "", opts)
|
||||
if err != nil {
|
||||
r.Fatal("error reading service data")
|
||||
}
|
||||
if len(services) != count {
|
||||
r.Fatalf("did not find %d catalog entries for %s", count, svc)
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
// CatalogServiceExists verifies the node name exists in the Consul catalog
|
||||
func CatalogNodeExists(t *testing.T, c *api.Client, nodeName string) {
|
||||
retry.Run(t, func(r *retry.R) {
|
||||
|
|
|
@ -46,6 +46,7 @@ type ServiceOpts struct {
|
|||
Checks Checks
|
||||
Connect SidecarService
|
||||
Namespace string
|
||||
Locality *api.Locality
|
||||
}
|
||||
|
||||
// createAndRegisterStaticServerAndSidecar register the services and launch static-server containers
|
||||
|
@ -119,6 +120,7 @@ func CreateAndRegisterStaticServerAndSidecar(node libcluster.Agent, serviceOpts
|
|||
Namespace: serviceOpts.Namespace,
|
||||
Meta: serviceOpts.Meta,
|
||||
Check: &agentCheck,
|
||||
Locality: serviceOpts.Locality,
|
||||
}
|
||||
return createAndRegisterStaticServerAndSidecar(node, serviceOpts.HTTPPort, serviceOpts.GRPCPort, req, containerArgs...)
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue