mirror of https://github.com/hashicorp/consul
You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
231 lines
8.9 KiB
231 lines
8.9 KiB
package xds |
|
|
|
import ( |
|
"context" |
|
"errors" |
|
"sync/atomic" |
|
"time" |
|
|
|
envoy_discovery_v3 "github.com/envoyproxy/go-control-plane/envoy/service/discovery/v3" |
|
|
|
"github.com/armon/go-metrics" |
|
"github.com/armon/go-metrics/prometheus" |
|
"github.com/hashicorp/go-hclog" |
|
"google.golang.org/grpc" |
|
"google.golang.org/grpc/codes" |
|
"google.golang.org/grpc/status" |
|
|
|
"github.com/hashicorp/consul/acl" |
|
"github.com/hashicorp/consul/agent/grpc/public" |
|
"github.com/hashicorp/consul/agent/proxycfg" |
|
"github.com/hashicorp/consul/agent/structs" |
|
"github.com/hashicorp/consul/agent/xds/xdscommon" |
|
) |
|
|
|
var StatsGauges = []prometheus.GaugeDefinition{ |
|
{ |
|
Name: []string{"xds", "server", "streams"}, |
|
Help: "Measures the number of active xDS streams handled by the server split by protocol version.", |
|
}, |
|
} |
|
|
|
// ADSStream is a shorter way of referring to this thing... |
|
type ADSStream = envoy_discovery_v3.AggregatedDiscoveryService_StreamAggregatedResourcesServer |
|
|
|
const ( |
|
// PublicListenerName is the name we give the public listener in Envoy config. |
|
PublicListenerName = "public_listener" |
|
|
|
// OutboundListenerName is the name we give the outbound Envoy listener when transparent proxy mode is enabled. |
|
OutboundListenerName = "outbound_listener" |
|
|
|
// LocalAppClusterName is the name we give the local application "cluster" in |
|
// Envoy config. Note that all cluster names may collide with service names |
|
// since we want cluster names and service names to match to enable nice |
|
// metrics correlation without massaging prefixes on cluster names. |
|
// |
|
// We should probably make this more unlikely to collide however changing it |
|
// potentially breaks upgrade compatibility without restarting all Envoy's as |
|
// it will no longer match their existing cluster name. Changing this will |
|
// affect metrics output so could break dashboards (for local app traffic). |
|
// |
|
// We should probably just make it configurable if anyone actually has |
|
// services named "local_app" in the future. |
|
LocalAppClusterName = "local_app" |
|
|
|
// LocalAgentClusterName is the name we give the local agent "cluster" in |
|
// Envoy config. Note that all cluster names may collide with service names |
|
// since we want cluster names and service names to match to enable nice |
|
// metrics correlation without massaging prefixes on cluster names. |
|
// |
|
// We should probably make this more unlikely to collied however changing it |
|
// potentially breaks upgrade compatibility without restarting all Envoy's as |
|
// it will no longer match their existing cluster name. Changing this will |
|
// affect metrics output so could break dashboards (for local agent traffic). |
|
// |
|
// We should probably just make it configurable if anyone actually has |
|
// services named "local_agent" in the future. |
|
LocalAgentClusterName = "local_agent" |
|
|
|
// OriginalDestinationClusterName is the name we give to the passthrough |
|
// cluster which redirects transparently-proxied requests to their original |
|
// destination outside the mesh. This cluster prevents Consul from blocking |
|
// connections to destinations outside of the catalog when in transparent |
|
// proxy mode. |
|
OriginalDestinationClusterName = "original-destination" |
|
|
|
// DefaultAuthCheckFrequency is the default value for |
|
// Server.AuthCheckFrequency to use when the zero value is provided. |
|
DefaultAuthCheckFrequency = 5 * time.Minute |
|
) |
|
|
|
// ACLResolverFunc is a shim to resolve ACLs. Since ACL enforcement is so far |
|
// entirely agent-local and all uses private methods this allows a simple shim |
|
// to be written in the agent package to allow resolving without tightly |
|
// coupling this to the agent. |
|
type ACLResolverFunc func(id string) (acl.Authorizer, error) |
|
|
|
// ConfigFetcher is the interface the agent needs to expose |
|
// for the xDS server to fetch agent config, currently only one field is fetched |
|
type ConfigFetcher interface { |
|
AdvertiseAddrLAN() string |
|
} |
|
|
|
// ProxyConfigSource is the interface xds.Server requires to consume proxy |
|
// config updates. |
|
type ProxyConfigSource interface { |
|
Watch(id structs.ServiceID, nodeName string, token string) (<-chan *proxycfg.ConfigSnapshot, proxycfg.CancelFunc, error) |
|
} |
|
|
|
// Server represents a gRPC server that can handle xDS requests from Envoy. All |
|
// of it's public members must be set before the gRPC server is started. |
|
// |
|
// A full description of the XDS protocol can be found at |
|
// https://www.envoyproxy.io/docs/envoy/latest/api-docs/xds_protocol |
|
type Server struct { |
|
NodeName string |
|
Logger hclog.Logger |
|
CfgSrc ProxyConfigSource |
|
ResolveToken ACLResolverFunc |
|
CfgFetcher ConfigFetcher |
|
|
|
// AuthCheckFrequency is how often we should re-check the credentials used |
|
// during a long-lived gRPC Stream after it has been initially established. |
|
// This is only used during idle periods of stream interactions (i.e. when |
|
// there has been no recent DiscoveryRequest). |
|
AuthCheckFrequency time.Duration |
|
|
|
// ResourceMapMutateFn exclusively exists for testing purposes. |
|
ResourceMapMutateFn func(resourceMap *xdscommon.IndexedResources) |
|
|
|
activeStreams *activeStreamCounters |
|
serverlessPluginEnabled bool |
|
} |
|
|
|
// activeStreamCounters simply encapsulates two counters accessed atomically to |
|
// ensure alignment is correct. This further requires that activeStreamCounters |
|
// be a pointer field. |
|
// TODO(eculver): refactor to remove xDSv2 refs |
|
type activeStreamCounters struct { |
|
xDSv3 uint64 |
|
xDSv2 uint64 |
|
} |
|
|
|
func (c *activeStreamCounters) Increment(xdsVersion string) func() { |
|
var counter *uint64 |
|
switch xdsVersion { |
|
case "v3": |
|
counter = &c.xDSv3 |
|
case "v2": |
|
counter = &c.xDSv2 |
|
default: |
|
return func() {} |
|
} |
|
|
|
labels := []metrics.Label{{Name: "version", Value: xdsVersion}} |
|
|
|
count := atomic.AddUint64(counter, 1) |
|
metrics.SetGaugeWithLabels([]string{"xds", "server", "streams"}, float32(count), labels) |
|
return func() { |
|
count := atomic.AddUint64(counter, ^uint64(0)) |
|
metrics.SetGaugeWithLabels([]string{"xds", "server", "streams"}, float32(count), labels) |
|
} |
|
} |
|
|
|
func NewServer( |
|
nodeName string, |
|
logger hclog.Logger, |
|
serverlessPluginEnabled bool, |
|
cfgMgr ProxyConfigSource, |
|
resolveToken ACLResolverFunc, |
|
cfgFetcher ConfigFetcher, |
|
) *Server { |
|
return &Server{ |
|
NodeName: nodeName, |
|
Logger: logger, |
|
CfgSrc: cfgMgr, |
|
ResolveToken: resolveToken, |
|
CfgFetcher: cfgFetcher, |
|
AuthCheckFrequency: DefaultAuthCheckFrequency, |
|
activeStreams: &activeStreamCounters{}, |
|
serverlessPluginEnabled: serverlessPluginEnabled, |
|
} |
|
} |
|
|
|
// StreamAggregatedResources implements |
|
// envoy_discovery_v3.AggregatedDiscoveryServiceServer. This is the ADS endpoint which is |
|
// the only xDS API we directly support for now. |
|
// |
|
// Deprecated: use DeltaAggregatedResources instead |
|
func (s *Server) StreamAggregatedResources(stream ADSStream) error { |
|
return errors.New("not implemented") |
|
} |
|
|
|
// Register the XDS server handlers to the given gRPC server. |
|
func (s *Server) Register(srv *grpc.Server) { |
|
envoy_discovery_v3.RegisterAggregatedDiscoveryServiceServer(srv, s) |
|
} |
|
|
|
// authorize the xDS request using the token stored in ctx. This authorization is |
|
// a bit different from most interfaces. Instead of explicitly authorizing or |
|
// filtering each piece of data in the response, the request is authorized |
|
// by checking the token has `service:write` for the service ID of the destination |
|
// service (for kind=ConnectProxy), or the gateway service (for other kinds). |
|
// This authorization strategy requires that agent/proxycfg only fetches data |
|
// using a token with the same permissions, and that it stores the data by |
|
// proxy ID. We assume that any data in the snapshot was already filtered, |
|
// which allows this authorization to be a shallow authorization check |
|
// for all the data in a ConfigSnapshot. |
|
func (s *Server) authorize(ctx context.Context, cfgSnap *proxycfg.ConfigSnapshot) error { |
|
if cfgSnap == nil { |
|
return status.Errorf(codes.Unauthenticated, "unauthenticated: no config snapshot") |
|
} |
|
|
|
authz, err := s.ResolveToken(public.TokenFromContext(ctx)) |
|
if acl.IsErrNotFound(err) { |
|
return status.Errorf(codes.Unauthenticated, "unauthenticated: %v", err) |
|
} else if acl.IsErrPermissionDenied(err) { |
|
return status.Error(codes.PermissionDenied, err.Error()) |
|
} else if err != nil { |
|
return status.Errorf(codes.Internal, "error resolving acl token: %v", err) |
|
} |
|
|
|
var authzContext acl.AuthorizerContext |
|
switch cfgSnap.Kind { |
|
case structs.ServiceKindConnectProxy: |
|
cfgSnap.ProxyID.EnterpriseMeta.FillAuthzContext(&authzContext) |
|
if err := authz.ToAllowAuthorizer().ServiceWriteAllowed(cfgSnap.Proxy.DestinationServiceName, &authzContext); err != nil { |
|
return status.Errorf(codes.PermissionDenied, err.Error()) |
|
} |
|
case structs.ServiceKindMeshGateway, structs.ServiceKindTerminatingGateway, structs.ServiceKindIngressGateway: |
|
cfgSnap.ProxyID.EnterpriseMeta.FillAuthzContext(&authzContext) |
|
if err := authz.ToAllowAuthorizer().ServiceWriteAllowed(cfgSnap.Service, &authzContext); err != nil { |
|
return status.Errorf(codes.PermissionDenied, err.Error()) |
|
} |
|
default: |
|
return status.Errorf(codes.Internal, "Invalid service kind") |
|
} |
|
|
|
// Authed OK! |
|
return nil |
|
}
|
|
|