mirror of https://github.com/hashicorp/consul
385 lines
13 KiB
Go
385 lines
13 KiB
Go
// Copyright (c) HashiCorp, Inc.
|
|
// SPDX-License-Identifier: BUSL-1.1
|
|
|
|
package xdsv2
|
|
|
|
import (
|
|
"errors"
|
|
"fmt"
|
|
|
|
envoy_cluster_v3 "github.com/envoyproxy/go-control-plane/envoy/config/cluster/v3"
|
|
envoy_core_v3 "github.com/envoyproxy/go-control-plane/envoy/config/core/v3"
|
|
envoy_aggregate_cluster_v3 "github.com/envoyproxy/go-control-plane/envoy/extensions/clusters/aggregate/v3"
|
|
envoy_upstreams_v3 "github.com/envoyproxy/go-control-plane/envoy/extensions/upstreams/http/v3"
|
|
envoy_type_v3 "github.com/envoyproxy/go-control-plane/envoy/type/v3"
|
|
"google.golang.org/protobuf/proto"
|
|
"google.golang.org/protobuf/types/known/anypb"
|
|
|
|
"github.com/hashicorp/consul/envoyextensions/xdscommon"
|
|
"github.com/hashicorp/consul/proto-public/pbmesh/v1alpha1/pbproxystate"
|
|
)
|
|
|
|
func (pr *ProxyResources) doesEnvoyClusterAlreadyExist(name string) bool {
|
|
// TODO(proxystate): consider using a map instead of [] for this kind of lookup
|
|
for _, envoyCluster := range pr.envoyResources[xdscommon.ClusterType] {
|
|
if envoyCluster.(*envoy_cluster_v3.Cluster).Name == name {
|
|
return true
|
|
}
|
|
}
|
|
return false
|
|
}
|
|
|
|
func (pr *ProxyResources) makeXDSClusters() ([]proto.Message, error) {
|
|
clusters := make([]proto.Message, 0)
|
|
|
|
for clusterName := range pr.proxyState.Clusters {
|
|
protoCluster, err := pr.makeClusters(clusterName)
|
|
// TODO: aggregate errors for clusters and still return any properly formed clusters.
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
clusters = append(clusters, protoCluster...)
|
|
}
|
|
|
|
return clusters, nil
|
|
}
|
|
|
|
func (pr *ProxyResources) makeClusters(name string) ([]proto.Message, error) {
|
|
clusters := make([]proto.Message, 0)
|
|
proxyStateCluster, ok := pr.proxyState.Clusters[name]
|
|
if !ok {
|
|
return nil, fmt.Errorf("cluster %q not found", name)
|
|
}
|
|
|
|
if pr.doesEnvoyClusterAlreadyExist(name) {
|
|
// don't error
|
|
return []proto.Message{}, nil
|
|
}
|
|
|
|
switch proxyStateCluster.Group.(type) {
|
|
case *pbproxystate.Cluster_FailoverGroup:
|
|
fg := proxyStateCluster.GetFailoverGroup()
|
|
clusters, err := pr.makeEnvoyAggregateCluster(name, proxyStateCluster.Protocol, fg)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
for _, c := range clusters {
|
|
clusters = append(clusters, c)
|
|
}
|
|
|
|
case *pbproxystate.Cluster_EndpointGroup:
|
|
eg := proxyStateCluster.GetEndpointGroup()
|
|
cluster, err := pr.makeEnvoyCluster(name, proxyStateCluster.Protocol, eg)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
clusters = append(clusters, cluster)
|
|
|
|
default:
|
|
return nil, errors.New("cluster group type should be Endpoint Group or Failover Group")
|
|
}
|
|
return clusters, nil
|
|
}
|
|
|
|
func (pr *ProxyResources) makeEnvoyCluster(name string, protocol string, eg *pbproxystate.EndpointGroup) (*envoy_cluster_v3.Cluster, error) {
|
|
if eg != nil {
|
|
switch t := eg.Group.(type) {
|
|
case *pbproxystate.EndpointGroup_Dynamic:
|
|
dynamic := eg.GetDynamic()
|
|
return pr.makeEnvoyDynamicCluster(name, protocol, dynamic)
|
|
case *pbproxystate.EndpointGroup_Static:
|
|
static := eg.GetStatic()
|
|
return pr.makeEnvoyStaticCluster(name, protocol, static)
|
|
case *pbproxystate.EndpointGroup_Dns:
|
|
dns := eg.GetDns()
|
|
return pr.makeEnvoyDnsCluster(name, protocol, dns)
|
|
case *pbproxystate.EndpointGroup_Passthrough:
|
|
passthrough := eg.GetPassthrough()
|
|
return pr.makeEnvoyPassthroughCluster(name, protocol, passthrough)
|
|
default:
|
|
return nil, fmt.Errorf("unsupported endpoint group type: %s", t)
|
|
}
|
|
}
|
|
return nil, fmt.Errorf("no endpoint group")
|
|
}
|
|
|
|
func (pr *ProxyResources) makeEnvoyDynamicCluster(name string, protocol string, dynamic *pbproxystate.DynamicEndpointGroup) (*envoy_cluster_v3.Cluster, error) {
|
|
cluster := &envoy_cluster_v3.Cluster{
|
|
Name: name,
|
|
ClusterDiscoveryType: &envoy_cluster_v3.Cluster_Type{Type: envoy_cluster_v3.Cluster_EDS},
|
|
EdsClusterConfig: &envoy_cluster_v3.Cluster_EdsClusterConfig{
|
|
EdsConfig: &envoy_core_v3.ConfigSource{
|
|
ResourceApiVersion: envoy_core_v3.ApiVersion_V3,
|
|
ConfigSourceSpecifier: &envoy_core_v3.ConfigSource_Ads{
|
|
Ads: &envoy_core_v3.AggregatedConfigSource{},
|
|
},
|
|
},
|
|
},
|
|
}
|
|
err := addHttpProtocolOptions(protocol, cluster)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
if dynamic.Config != nil {
|
|
if dynamic.Config.UseAltStatName {
|
|
cluster.AltStatName = name
|
|
}
|
|
cluster.ConnectTimeout = dynamic.Config.ConnectTimeout
|
|
if !dynamic.Config.DisablePanicThreshold {
|
|
cluster.CommonLbConfig = &envoy_cluster_v3.Cluster_CommonLbConfig{
|
|
HealthyPanicThreshold: &envoy_type_v3.Percent{
|
|
Value: 0, // disable panic threshold
|
|
},
|
|
}
|
|
}
|
|
addEnvoyCircuitBreakers(dynamic.Config.CircuitBreakers, cluster)
|
|
addEnvoyOutlierDetection(dynamic.Config.OutlierDetection, cluster)
|
|
|
|
err := addEnvoyLBToCluster(dynamic.Config, cluster)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
}
|
|
|
|
if dynamic.OutboundTls != nil {
|
|
envoyTransportSocket, err := pr.makeEnvoyTransportSocket(dynamic.OutboundTls)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
cluster.TransportSocket = envoyTransportSocket
|
|
}
|
|
|
|
return cluster, nil
|
|
|
|
}
|
|
|
|
func (pr *ProxyResources) makeEnvoyStaticCluster(name string, protocol string, static *pbproxystate.StaticEndpointGroup) (*envoy_cluster_v3.Cluster, error) {
|
|
cluster := &envoy_cluster_v3.Cluster{
|
|
Name: name,
|
|
ClusterDiscoveryType: &envoy_cluster_v3.Cluster_Type{Type: envoy_cluster_v3.Cluster_STATIC},
|
|
}
|
|
|
|
// todo (ishustava/v2): we need to be able to handle the case when empty endpoints are allowed on a cluster.
|
|
endpointList, ok := pr.proxyState.Endpoints[name]
|
|
if ok {
|
|
cluster.LoadAssignment = makeEnvoyClusterLoadAssignment(name, endpointList.Endpoints)
|
|
}
|
|
|
|
var err error
|
|
if name == xdscommon.LocalAppClusterName {
|
|
err = addLocalAppHttpProtocolOptions(protocol, cluster)
|
|
} else {
|
|
err = addHttpProtocolOptions(protocol, cluster)
|
|
}
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
if static.Config != nil {
|
|
cluster.ConnectTimeout = static.Config.ConnectTimeout
|
|
addEnvoyCircuitBreakers(static.GetConfig().CircuitBreakers, cluster)
|
|
}
|
|
return cluster, nil
|
|
}
|
|
|
|
func (pr *ProxyResources) makeEnvoyDnsCluster(name string, protocol string, dns *pbproxystate.DNSEndpointGroup) (*envoy_cluster_v3.Cluster, error) {
|
|
return nil, nil
|
|
}
|
|
|
|
func (pr *ProxyResources) makeEnvoyPassthroughCluster(name string, protocol string, passthrough *pbproxystate.PassthroughEndpointGroup) (*envoy_cluster_v3.Cluster, error) {
|
|
cluster := &envoy_cluster_v3.Cluster{
|
|
Name: name,
|
|
ConnectTimeout: passthrough.Config.ConnectTimeout,
|
|
LbPolicy: envoy_cluster_v3.Cluster_CLUSTER_PROVIDED,
|
|
ClusterDiscoveryType: &envoy_cluster_v3.Cluster_Type{Type: envoy_cluster_v3.Cluster_ORIGINAL_DST},
|
|
}
|
|
if passthrough.OutboundTls != nil {
|
|
envoyTransportSocket, err := pr.makeEnvoyTransportSocket(passthrough.OutboundTls)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
cluster.TransportSocket = envoyTransportSocket
|
|
}
|
|
err := addHttpProtocolOptions(protocol, cluster)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
return cluster, nil
|
|
}
|
|
|
|
func (pr *ProxyResources) makeEnvoyAggregateCluster(name string, protocol string, fg *pbproxystate.FailoverGroup) ([]*envoy_cluster_v3.Cluster, error) {
|
|
var clusters []*envoy_cluster_v3.Cluster
|
|
if fg != nil {
|
|
var egNames []string
|
|
for _, eg := range fg.EndpointGroups {
|
|
cluster, err := pr.makeEnvoyCluster(eg.Name, protocol, eg)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
egNames = append(egNames, cluster.Name)
|
|
clusters = append(clusters, cluster)
|
|
}
|
|
aggregateClusterConfig, err := anypb.New(&envoy_aggregate_cluster_v3.ClusterConfig{
|
|
Clusters: egNames,
|
|
})
|
|
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
c := &envoy_cluster_v3.Cluster{
|
|
Name: name,
|
|
ConnectTimeout: fg.Config.ConnectTimeout,
|
|
LbPolicy: envoy_cluster_v3.Cluster_CLUSTER_PROVIDED,
|
|
ClusterDiscoveryType: &envoy_cluster_v3.Cluster_ClusterType{
|
|
ClusterType: &envoy_cluster_v3.Cluster_CustomClusterType{
|
|
Name: "envoy.clusters.aggregate",
|
|
TypedConfig: aggregateClusterConfig,
|
|
},
|
|
},
|
|
}
|
|
if fg.Config.UseAltStatName {
|
|
c.AltStatName = name
|
|
}
|
|
err = addHttpProtocolOptions(protocol, c)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
clusters = append(clusters, c)
|
|
}
|
|
return clusters, nil
|
|
}
|
|
|
|
func addLocalAppHttpProtocolOptions(protocol string, c *envoy_cluster_v3.Cluster) error {
|
|
if !(protocol == "http2" || protocol == "grpc") {
|
|
// do not error. returning nil means it won't get set.
|
|
return nil
|
|
}
|
|
cfg := &envoy_upstreams_v3.HttpProtocolOptions{
|
|
UpstreamProtocolOptions: &envoy_upstreams_v3.HttpProtocolOptions_UseDownstreamProtocolConfig{
|
|
UseDownstreamProtocolConfig: &envoy_upstreams_v3.HttpProtocolOptions_UseDownstreamHttpConfig{
|
|
HttpProtocolOptions: &envoy_core_v3.Http1ProtocolOptions{},
|
|
Http2ProtocolOptions: &envoy_core_v3.Http2ProtocolOptions{},
|
|
},
|
|
},
|
|
}
|
|
any, err := anypb.New(cfg)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
c.TypedExtensionProtocolOptions = map[string]*anypb.Any{
|
|
"envoy.extensions.upstreams.http.v3.HttpProtocolOptions": any,
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func addHttpProtocolOptions(protocol string, c *envoy_cluster_v3.Cluster) error {
|
|
if !(protocol == "http2" || protocol == "grpc") {
|
|
// do not error. returning nil means it won't get set.
|
|
return nil
|
|
}
|
|
cfg := &envoy_upstreams_v3.HttpProtocolOptions{
|
|
UpstreamProtocolOptions: &envoy_upstreams_v3.HttpProtocolOptions_ExplicitHttpConfig_{
|
|
ExplicitHttpConfig: &envoy_upstreams_v3.HttpProtocolOptions_ExplicitHttpConfig{
|
|
ProtocolConfig: &envoy_upstreams_v3.HttpProtocolOptions_ExplicitHttpConfig_Http2ProtocolOptions{
|
|
Http2ProtocolOptions: &envoy_core_v3.Http2ProtocolOptions{},
|
|
},
|
|
},
|
|
},
|
|
}
|
|
any, err := anypb.New(cfg)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
c.TypedExtensionProtocolOptions = map[string]*anypb.Any{
|
|
"envoy.extensions.upstreams.http.v3.HttpProtocolOptions": any,
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// addEnvoyOutlierDetection will add outlier detection config to the cluster, and if nil, add empty OutlierDetection to
|
|
// enable it with default values
|
|
func addEnvoyOutlierDetection(outlierDetection *pbproxystate.OutlierDetection, c *envoy_cluster_v3.Cluster) {
|
|
if outlierDetection == nil {
|
|
return
|
|
}
|
|
od := &envoy_cluster_v3.OutlierDetection{
|
|
BaseEjectionTime: outlierDetection.GetBaseEjectionTime(),
|
|
Consecutive_5Xx: outlierDetection.GetConsecutive_5Xx(),
|
|
EnforcingConsecutive_5Xx: outlierDetection.GetEnforcingConsecutive_5Xx(),
|
|
Interval: outlierDetection.GetInterval(),
|
|
MaxEjectionPercent: outlierDetection.GetMaxEjectionPercent(),
|
|
}
|
|
c.OutlierDetection = od
|
|
|
|
}
|
|
|
|
func addEnvoyCircuitBreakers(circuitBreakers *pbproxystate.CircuitBreakers, c *envoy_cluster_v3.Cluster) {
|
|
if circuitBreakers != nil {
|
|
if circuitBreakers.UpstreamLimits == nil {
|
|
c.CircuitBreakers = &envoy_cluster_v3.CircuitBreakers{}
|
|
return
|
|
}
|
|
threshold := &envoy_cluster_v3.CircuitBreakers_Thresholds{}
|
|
threshold.MaxConnections = circuitBreakers.UpstreamLimits.MaxConnections
|
|
threshold.MaxPendingRequests = circuitBreakers.UpstreamLimits.MaxPendingRequests
|
|
threshold.MaxRequests = circuitBreakers.UpstreamLimits.MaxConcurrentRequests
|
|
|
|
c.CircuitBreakers = &envoy_cluster_v3.CircuitBreakers{
|
|
Thresholds: []*envoy_cluster_v3.CircuitBreakers_Thresholds{threshold},
|
|
}
|
|
}
|
|
}
|
|
|
|
func addEnvoyLBToCluster(dynamicConfig *pbproxystate.DynamicEndpointGroupConfig, c *envoy_cluster_v3.Cluster) error {
|
|
if dynamicConfig == nil || dynamicConfig.LbPolicy == nil {
|
|
return nil
|
|
}
|
|
|
|
switch d := dynamicConfig.LbPolicy.(type) {
|
|
case *pbproxystate.DynamicEndpointGroupConfig_LeastRequest:
|
|
c.LbPolicy = envoy_cluster_v3.Cluster_LEAST_REQUEST
|
|
|
|
lb := dynamicConfig.LbPolicy.(*pbproxystate.DynamicEndpointGroupConfig_LeastRequest)
|
|
if lb.LeastRequest != nil {
|
|
c.LbConfig = &envoy_cluster_v3.Cluster_LeastRequestLbConfig_{
|
|
LeastRequestLbConfig: &envoy_cluster_v3.Cluster_LeastRequestLbConfig{
|
|
ChoiceCount: lb.LeastRequest.ChoiceCount,
|
|
},
|
|
}
|
|
}
|
|
case *pbproxystate.DynamicEndpointGroupConfig_RoundRobin:
|
|
c.LbPolicy = envoy_cluster_v3.Cluster_ROUND_ROBIN
|
|
|
|
case *pbproxystate.DynamicEndpointGroupConfig_Random:
|
|
c.LbPolicy = envoy_cluster_v3.Cluster_RANDOM
|
|
|
|
case *pbproxystate.DynamicEndpointGroupConfig_RingHash:
|
|
c.LbPolicy = envoy_cluster_v3.Cluster_RING_HASH
|
|
|
|
lb := dynamicConfig.LbPolicy.(*pbproxystate.DynamicEndpointGroupConfig_RingHash)
|
|
if lb.RingHash != nil {
|
|
c.LbConfig = &envoy_cluster_v3.Cluster_RingHashLbConfig_{
|
|
RingHashLbConfig: &envoy_cluster_v3.Cluster_RingHashLbConfig{
|
|
MinimumRingSize: lb.RingHash.MinimumRingSize,
|
|
MaximumRingSize: lb.RingHash.MaximumRingSize,
|
|
},
|
|
}
|
|
}
|
|
case *pbproxystate.DynamicEndpointGroupConfig_Maglev:
|
|
c.LbPolicy = envoy_cluster_v3.Cluster_MAGLEV
|
|
|
|
default:
|
|
return fmt.Errorf("unsupported load balancer policy %q for cluster %q", d, c.Name)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// TODO(proxystate): In a future PR this will create clusters and add it to ProxyResources.proxyState
|
|
// Currently, we do not traverse the listener -> endpoint paths and instead just generate each resource by iterating
|
|
// through its top level map. In the future we want to traverse these paths to ensure each listener has a cluster, etc.
|
|
func (pr *ProxyResources) makeEnvoyClusterFromL4Destination(l4 *pbproxystate.L4Destination) error {
|
|
return nil
|
|
}
|