mirror of https://github.com/hashicorp/consul
Merge pull request #7533 from hashicorp/dnephin/xds-server-1
agent/xds: small cleanuppull/7570/head
commit
190fc3c732
|
@ -22,16 +22,16 @@ import (
|
||||||
)
|
)
|
||||||
|
|
||||||
// clustersFromSnapshot returns the xDS API representation of the "clusters" in the snapshot.
|
// clustersFromSnapshot returns the xDS API representation of the "clusters" in the snapshot.
|
||||||
func (s *Server) clustersFromSnapshot(cfgSnap *proxycfg.ConfigSnapshot, token string) ([]proto.Message, error) {
|
func (s *Server) clustersFromSnapshot(cfgSnap *proxycfg.ConfigSnapshot, _ string) ([]proto.Message, error) {
|
||||||
if cfgSnap == nil {
|
if cfgSnap == nil {
|
||||||
return nil, errors.New("nil config given")
|
return nil, errors.New("nil config given")
|
||||||
}
|
}
|
||||||
|
|
||||||
switch cfgSnap.Kind {
|
switch cfgSnap.Kind {
|
||||||
case structs.ServiceKindConnectProxy:
|
case structs.ServiceKindConnectProxy:
|
||||||
return s.clustersFromSnapshotConnectProxy(cfgSnap, token)
|
return s.clustersFromSnapshotConnectProxy(cfgSnap)
|
||||||
case structs.ServiceKindMeshGateway:
|
case structs.ServiceKindMeshGateway:
|
||||||
return s.clustersFromSnapshotMeshGateway(cfgSnap, token)
|
return s.clustersFromSnapshotMeshGateway(cfgSnap)
|
||||||
default:
|
default:
|
||||||
return nil, fmt.Errorf("Invalid service kind: %v", cfgSnap.Kind)
|
return nil, fmt.Errorf("Invalid service kind: %v", cfgSnap.Kind)
|
||||||
}
|
}
|
||||||
|
@ -39,7 +39,7 @@ func (s *Server) clustersFromSnapshot(cfgSnap *proxycfg.ConfigSnapshot, token st
|
||||||
|
|
||||||
// clustersFromSnapshot returns the xDS API representation of the "clusters"
|
// clustersFromSnapshot returns the xDS API representation of the "clusters"
|
||||||
// (upstreams) in the snapshot.
|
// (upstreams) in the snapshot.
|
||||||
func (s *Server) clustersFromSnapshotConnectProxy(cfgSnap *proxycfg.ConfigSnapshot, token string) ([]proto.Message, error) {
|
func (s *Server) clustersFromSnapshotConnectProxy(cfgSnap *proxycfg.ConfigSnapshot) ([]proto.Message, error) {
|
||||||
// TODO(rb): this sizing is a low bound.
|
// TODO(rb): this sizing is a low bound.
|
||||||
clusters := make([]proto.Message, 0, len(cfgSnap.Proxy.Upstreams)+1)
|
clusters := make([]proto.Message, 0, len(cfgSnap.Proxy.Upstreams)+1)
|
||||||
|
|
||||||
|
@ -112,7 +112,7 @@ func makeExposeClusterName(destinationPort int) string {
|
||||||
// clustersFromSnapshotMeshGateway returns the xDS API representation of the "clusters"
|
// clustersFromSnapshotMeshGateway returns the xDS API representation of the "clusters"
|
||||||
// for a mesh gateway. This will include 1 cluster per remote datacenter as well as
|
// for a mesh gateway. This will include 1 cluster per remote datacenter as well as
|
||||||
// 1 cluster for each service subset.
|
// 1 cluster for each service subset.
|
||||||
func (s *Server) clustersFromSnapshotMeshGateway(cfgSnap *proxycfg.ConfigSnapshot, token string) ([]proto.Message, error) {
|
func (s *Server) clustersFromSnapshotMeshGateway(cfgSnap *proxycfg.ConfigSnapshot) ([]proto.Message, error) {
|
||||||
datacenters := cfgSnap.MeshGateway.Datacenters()
|
datacenters := cfgSnap.MeshGateway.Datacenters()
|
||||||
|
|
||||||
// 1 cluster per remote dc + 1 cluster per local service (this is a lower bound - all subset specific clusters will be appended)
|
// 1 cluster per remote dc + 1 cluster per local service (this is a lower bound - all subset specific clusters will be appended)
|
||||||
|
|
|
@ -505,35 +505,6 @@ type customClusterJSONOptions struct {
|
||||||
TLSContext string
|
TLSContext string
|
||||||
}
|
}
|
||||||
|
|
||||||
var customEDSClusterJSONTpl = `{
|
|
||||||
{{ if .IncludeType -}}
|
|
||||||
"@type": "type.googleapis.com/envoy.api.v2.Cluster",
|
|
||||||
{{- end }}
|
|
||||||
{{ if .TLSContext -}}
|
|
||||||
"tlsContext": {{ .TLSContext }},
|
|
||||||
{{- end }}
|
|
||||||
"name": "{{ .Name }}",
|
|
||||||
"type": "EDS",
|
|
||||||
"edsClusterConfig": {
|
|
||||||
"edsConfig": {
|
|
||||||
"ads": {
|
|
||||||
|
|
||||||
}
|
|
||||||
}
|
|
||||||
},
|
|
||||||
"connectTimeout": "5s"
|
|
||||||
}`
|
|
||||||
|
|
||||||
var customEDSClusterJSONTemplate = template.Must(template.New("").Parse(customEDSClusterJSONTpl))
|
|
||||||
|
|
||||||
func customEDSClusterJSON(t *testing.T, opts customClusterJSONOptions) string {
|
|
||||||
t.Helper()
|
|
||||||
var buf bytes.Buffer
|
|
||||||
err := customEDSClusterJSONTemplate.Execute(&buf, opts)
|
|
||||||
require.NoError(t, err)
|
|
||||||
return buf.String()
|
|
||||||
}
|
|
||||||
|
|
||||||
var customAppClusterJSONTpl = `{
|
var customAppClusterJSONTpl = `{
|
||||||
{{ if .IncludeType -}}
|
{{ if .IncludeType -}}
|
||||||
"@type": "type.googleapis.com/envoy.api.v2.Cluster",
|
"@type": "type.googleapis.com/envoy.api.v2.Cluster",
|
||||||
|
|
|
@ -22,16 +22,16 @@ const (
|
||||||
)
|
)
|
||||||
|
|
||||||
// endpointsFromSnapshot returns the xDS API representation of the "endpoints"
|
// endpointsFromSnapshot returns the xDS API representation of the "endpoints"
|
||||||
func (s *Server) endpointsFromSnapshot(cfgSnap *proxycfg.ConfigSnapshot, token string) ([]proto.Message, error) {
|
func (s *Server) endpointsFromSnapshot(cfgSnap *proxycfg.ConfigSnapshot, _ string) ([]proto.Message, error) {
|
||||||
if cfgSnap == nil {
|
if cfgSnap == nil {
|
||||||
return nil, errors.New("nil config given")
|
return nil, errors.New("nil config given")
|
||||||
}
|
}
|
||||||
|
|
||||||
switch cfgSnap.Kind {
|
switch cfgSnap.Kind {
|
||||||
case structs.ServiceKindConnectProxy:
|
case structs.ServiceKindConnectProxy:
|
||||||
return s.endpointsFromSnapshotConnectProxy(cfgSnap, token)
|
return s.endpointsFromSnapshotConnectProxy(cfgSnap)
|
||||||
case structs.ServiceKindMeshGateway:
|
case structs.ServiceKindMeshGateway:
|
||||||
return s.endpointsFromSnapshotMeshGateway(cfgSnap, token)
|
return s.endpointsFromSnapshotMeshGateway(cfgSnap)
|
||||||
default:
|
default:
|
||||||
return nil, fmt.Errorf("Invalid service kind: %v", cfgSnap.Kind)
|
return nil, fmt.Errorf("Invalid service kind: %v", cfgSnap.Kind)
|
||||||
}
|
}
|
||||||
|
@ -39,7 +39,7 @@ func (s *Server) endpointsFromSnapshot(cfgSnap *proxycfg.ConfigSnapshot, token s
|
||||||
|
|
||||||
// endpointsFromSnapshotConnectProxy returns the xDS API representation of the "endpoints"
|
// endpointsFromSnapshotConnectProxy returns the xDS API representation of the "endpoints"
|
||||||
// (upstream instances) in the snapshot.
|
// (upstream instances) in the snapshot.
|
||||||
func (s *Server) endpointsFromSnapshotConnectProxy(cfgSnap *proxycfg.ConfigSnapshot, token string) ([]proto.Message, error) {
|
func (s *Server) endpointsFromSnapshotConnectProxy(cfgSnap *proxycfg.ConfigSnapshot) ([]proto.Message, error) {
|
||||||
resources := make([]proto.Message, 0,
|
resources := make([]proto.Message, 0,
|
||||||
len(cfgSnap.ConnectProxy.PreparedQueryEndpoints)+len(cfgSnap.ConnectProxy.WatchedUpstreamEndpoints))
|
len(cfgSnap.ConnectProxy.PreparedQueryEndpoints)+len(cfgSnap.ConnectProxy.WatchedUpstreamEndpoints))
|
||||||
|
|
||||||
|
@ -170,7 +170,7 @@ func (s *Server) filterSubsetEndpoints(subset *structs.ServiceResolverSubset, en
|
||||||
return endpoints, nil
|
return endpoints, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *Server) endpointsFromSnapshotMeshGateway(cfgSnap *proxycfg.ConfigSnapshot, token string) ([]proto.Message, error) {
|
func (s *Server) endpointsFromSnapshotMeshGateway(cfgSnap *proxycfg.ConfigSnapshot) ([]proto.Message, error) {
|
||||||
datacenters := cfgSnap.MeshGateway.Datacenters()
|
datacenters := cfgSnap.MeshGateway.Datacenters()
|
||||||
resources := make([]proto.Message, 0, len(datacenters)+len(cfgSnap.MeshGateway.ServiceGroups))
|
resources := make([]proto.Message, 0, len(datacenters)+len(cfgSnap.MeshGateway.ServiceGroups))
|
||||||
|
|
||||||
|
|
|
@ -39,7 +39,7 @@ func (s *Server) listenersFromSnapshot(cfgSnap *proxycfg.ConfigSnapshot, token s
|
||||||
case structs.ServiceKindConnectProxy:
|
case structs.ServiceKindConnectProxy:
|
||||||
return s.listenersFromSnapshotConnectProxy(cfgSnap, token)
|
return s.listenersFromSnapshotConnectProxy(cfgSnap, token)
|
||||||
case structs.ServiceKindMeshGateway:
|
case structs.ServiceKindMeshGateway:
|
||||||
return s.listenersFromSnapshotMeshGateway(cfgSnap, token)
|
return s.listenersFromSnapshotMeshGateway(cfgSnap)
|
||||||
default:
|
default:
|
||||||
return nil, fmt.Errorf("Invalid service kind: %v", cfgSnap.Kind)
|
return nil, fmt.Errorf("Invalid service kind: %v", cfgSnap.Kind)
|
||||||
}
|
}
|
||||||
|
@ -180,7 +180,7 @@ func parseCheckPath(check structs.CheckType) (structs.ExposePath, error) {
|
||||||
}
|
}
|
||||||
|
|
||||||
// listenersFromSnapshotMeshGateway returns the "listener" for a mesh-gateway service
|
// listenersFromSnapshotMeshGateway returns the "listener" for a mesh-gateway service
|
||||||
func (s *Server) listenersFromSnapshotMeshGateway(cfgSnap *proxycfg.ConfigSnapshot, token string) ([]proto.Message, error) {
|
func (s *Server) listenersFromSnapshotMeshGateway(cfgSnap *proxycfg.ConfigSnapshot) ([]proto.Message, error) {
|
||||||
cfg, err := ParseGatewayConfig(cfgSnap.Proxy.Config)
|
cfg, err := ParseGatewayConfig(cfgSnap.Proxy.Config)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
// Don't hard fail on a config typo, just warn. The parse func returns
|
// Don't hard fail on a config typo, just warn. The parse func returns
|
||||||
|
|
|
@ -15,14 +15,14 @@ import (
|
||||||
|
|
||||||
// routesFromSnapshot returns the xDS API representation of the "routes" in the
|
// routesFromSnapshot returns the xDS API representation of the "routes" in the
|
||||||
// snapshot.
|
// snapshot.
|
||||||
func routesFromSnapshot(cfgSnap *proxycfg.ConfigSnapshot, token string) ([]proto.Message, error) {
|
func routesFromSnapshot(cfgSnap *proxycfg.ConfigSnapshot, _ string) ([]proto.Message, error) {
|
||||||
if cfgSnap == nil {
|
if cfgSnap == nil {
|
||||||
return nil, errors.New("nil config given")
|
return nil, errors.New("nil config given")
|
||||||
}
|
}
|
||||||
|
|
||||||
switch cfgSnap.Kind {
|
switch cfgSnap.Kind {
|
||||||
case structs.ServiceKindConnectProxy:
|
case structs.ServiceKindConnectProxy:
|
||||||
return routesFromSnapshotConnectProxy(cfgSnap, token)
|
return routesFromSnapshotConnectProxy(cfgSnap)
|
||||||
default:
|
default:
|
||||||
return nil, fmt.Errorf("Invalid service kind: %v", cfgSnap.Kind)
|
return nil, fmt.Errorf("Invalid service kind: %v", cfgSnap.Kind)
|
||||||
}
|
}
|
||||||
|
@ -30,7 +30,7 @@ func routesFromSnapshot(cfgSnap *proxycfg.ConfigSnapshot, token string) ([]proto
|
||||||
|
|
||||||
// routesFromSnapshotConnectProxy returns the xDS API representation of the
|
// routesFromSnapshotConnectProxy returns the xDS API representation of the
|
||||||
// "routes" in the snapshot.
|
// "routes" in the snapshot.
|
||||||
func routesFromSnapshotConnectProxy(cfgSnap *proxycfg.ConfigSnapshot, token string) ([]proto.Message, error) {
|
func routesFromSnapshotConnectProxy(cfgSnap *proxycfg.ConfigSnapshot) ([]proto.Message, error) {
|
||||||
if cfgSnap == nil {
|
if cfgSnap == nil {
|
||||||
return nil, errors.New("nil config given")
|
return nil, errors.New("nil config given")
|
||||||
}
|
}
|
||||||
|
|
|
@ -190,8 +190,8 @@ func (s *Server) process(stream ADSStream, reqCh <-chan *envoy.DiscoveryRequest)
|
||||||
var nonce uint64
|
var nonce uint64
|
||||||
|
|
||||||
// xDS works with versions of configs. Internally we don't have a consistent
|
// xDS works with versions of configs. Internally we don't have a consistent
|
||||||
// version. We could just hash the config since versions don't have to be
|
// version. We could hash the config since versions don't have to be
|
||||||
// ordered as far as I can tell, but it's cheaper just to increment a counter
|
// ordered as far as I can tell, but it is cheaper to increment a counter
|
||||||
// every time we observe a new config since the upstream proxycfg package only
|
// every time we observe a new config since the upstream proxycfg package only
|
||||||
// delivers updates when there are actual changes.
|
// delivers updates when there are actual changes.
|
||||||
var configVersion uint64
|
var configVersion uint64
|
||||||
|
@ -209,12 +209,12 @@ func (s *Server) process(stream ADSStream, reqCh <-chan *envoy.DiscoveryRequest)
|
||||||
|
|
||||||
// Configure handlers for each type of request
|
// Configure handlers for each type of request
|
||||||
handlers := map[string]*xDSType{
|
handlers := map[string]*xDSType{
|
||||||
EndpointType: &xDSType{
|
EndpointType: {
|
||||||
typeURL: EndpointType,
|
typeURL: EndpointType,
|
||||||
resources: s.endpointsFromSnapshot,
|
resources: s.endpointsFromSnapshot,
|
||||||
stream: stream,
|
stream: stream,
|
||||||
},
|
},
|
||||||
ClusterType: &xDSType{
|
ClusterType: {
|
||||||
typeURL: ClusterType,
|
typeURL: ClusterType,
|
||||||
resources: s.clustersFromSnapshot,
|
resources: s.clustersFromSnapshot,
|
||||||
stream: stream,
|
stream: stream,
|
||||||
|
@ -223,12 +223,12 @@ func (s *Server) process(stream ADSStream, reqCh <-chan *envoy.DiscoveryRequest)
|
||||||
return cfgSnap.Kind == structs.ServiceKindMeshGateway
|
return cfgSnap.Kind == structs.ServiceKindMeshGateway
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
RouteType: &xDSType{
|
RouteType: {
|
||||||
typeURL: RouteType,
|
typeURL: RouteType,
|
||||||
resources: routesFromSnapshot,
|
resources: routesFromSnapshot,
|
||||||
stream: stream,
|
stream: stream,
|
||||||
},
|
},
|
||||||
ListenerType: &xDSType{
|
ListenerType: {
|
||||||
typeURL: ListenerType,
|
typeURL: ListenerType,
|
||||||
resources: s.listenersFromSnapshot,
|
resources: s.listenersFromSnapshot,
|
||||||
stream: stream,
|
stream: stream,
|
||||||
|
@ -245,8 +245,7 @@ func (s *Server) process(stream ADSStream, reqCh <-chan *envoy.DiscoveryRequest)
|
||||||
return status.Errorf(codes.Unauthenticated, "unauthenticated: no config snapshot")
|
return status.Errorf(codes.Unauthenticated, "unauthenticated: no config snapshot")
|
||||||
}
|
}
|
||||||
|
|
||||||
token := tokenFromStream(stream)
|
rule, err := s.ResolveToken(tokenFromContext(stream.Context()))
|
||||||
rule, err := s.ResolveToken(token)
|
|
||||||
|
|
||||||
if acl.IsErrNotFound(err) {
|
if acl.IsErrNotFound(err) {
|
||||||
return status.Errorf(codes.Unauthenticated, "unauthenticated: %v", err)
|
return status.Errorf(codes.Unauthenticated, "unauthenticated: %v", err)
|
||||||
|
@ -397,7 +396,7 @@ func (t *xDSType) SendIfNew(cfgSnap *proxycfg.ConfigSnapshot, version uint64, no
|
||||||
// Already sent this version
|
// Already sent this version
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
resources, err := t.resources(cfgSnap, tokenFromStream(t.stream))
|
resources, err := t.resources(cfgSnap, tokenFromContext(t.stream.Context()))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
@ -436,10 +435,6 @@ func (t *xDSType) SendIfNew(cfgSnap *proxycfg.ConfigSnapshot, version uint64, no
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func tokenFromStream(stream ADSStream) string {
|
|
||||||
return tokenFromContext(stream.Context())
|
|
||||||
}
|
|
||||||
|
|
||||||
func tokenFromContext(ctx context.Context) string {
|
func tokenFromContext(ctx context.Context) string {
|
||||||
md, ok := metadata.FromIncomingContext(ctx)
|
md, ok := metadata.FromIncomingContext(ctx)
|
||||||
if !ok {
|
if !ok {
|
||||||
|
|
|
@ -1,4 +1,4 @@
|
||||||
// Package xds provides an impementation of a gRPC service that exports Envoy's
|
// Package xds provides an implementation of a gRPC service that exports Envoy's
|
||||||
// xDS API for config discovery. Specifically we support the Aggregated
|
// xDS API for config discovery. Specifically we support the Aggregated
|
||||||
// Discovery Service (ADS) only as we control all config.
|
// Discovery Service (ADS) only as we control all config.
|
||||||
//
|
//
|
||||||
|
|
Loading…
Reference in New Issue