Prepare for having different service kinds that are all generic… (#6013)

Default to internal error when service kind is unknown
pull/6017/head
Matt Keeler 2019-06-24 15:05:36 -04:00 committed by GitHub
parent 43c5ba0304
commit 813e009a2d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 96 additions and 15 deletions

View File

@ -9,6 +9,7 @@ import (
// It is meant to be point-in-time coherent and is used to deliver the current // It is meant to be point-in-time coherent and is used to deliver the current
// config state to observers who need it to be pushed in (e.g. XDS server). // config state to observers who need it to be pushed in (e.g. XDS server).
type ConfigSnapshot struct { type ConfigSnapshot struct {
Kind structs.ServiceKind
ProxyID string ProxyID string
Address string Address string
Port int Port int
@ -22,7 +23,12 @@ type ConfigSnapshot struct {
// Valid returns whether or not the snapshot has all required fields filled yet. // Valid returns whether or not the snapshot has all required fields filled yet.
func (s *ConfigSnapshot) Valid() bool { func (s *ConfigSnapshot) Valid() bool {
switch s.Kind {
case structs.ServiceKindConnectProxy:
return s.Roots != nil && s.Leaf != nil return s.Roots != nil && s.Leaf != nil
default:
return false
}
} }
// Clone makes a deep copy of the snapshot we can send to other goroutines // Clone makes a deep copy of the snapshot we can send to other goroutines

View File

@ -38,6 +38,7 @@ type state struct {
ctx context.Context ctx context.Context
cancel func() cancel func()
kind structs.ServiceKind
proxyID string proxyID string
address string address string
port int port int
@ -72,6 +73,7 @@ func newState(ns *structs.NodeService, token string) (*state, error) {
} }
return &state{ return &state{
kind: ns.Kind,
proxyID: ns.ID, proxyID: ns.ID,
address: ns.Address, address: ns.Address,
port: ns.Port, port: ns.Port,
@ -116,9 +118,19 @@ func (s *state) Close() error {
return nil return nil
} }
// initWatches sets up the watches needed based on current proxy registration // initWatches sets up the watches needed for the particular service
// state.
func (s *state) initWatches() error { func (s *state) initWatches() error {
switch s.kind {
case structs.ServiceKindConnectProxy:
return s.initWatchesConnectProxy()
default:
return fmt.Errorf("Unsupported service kind")
}
}
// initWatchesConnectProxy sets up the watches needed based on current proxy registration
// state.
func (s *state) initWatchesConnectProxy() error {
// Watch for root changes // Watch for root changes
err := s.cache.Notify(s.ctx, cachetype.ConnectCARootName, &structs.DCSpecificRequest{ err := s.cache.Notify(s.ctx, cachetype.ConnectCARootName, &structs.DCSpecificRequest{
Datacenter: s.source.Datacenter, Datacenter: s.source.Datacenter,
@ -203,12 +215,18 @@ func (s *state) run() {
defer close(s.snapCh) defer close(s.snapCh)
snap := ConfigSnapshot{ snap := ConfigSnapshot{
Kind: s.kind,
ProxyID: s.proxyID, ProxyID: s.proxyID,
Address: s.address, Address: s.address,
Port: s.port, Port: s.port,
Proxy: s.proxyCfg, Proxy: s.proxyCfg,
UpstreamEndpoints: make(map[string]structs.CheckServiceNodes),
} }
switch s.kind {
case structs.ServiceKindConnectProxy:
snap.UpstreamEndpoints = make(map[string]structs.CheckServiceNodes)
}
// This turns out to be really fiddly/painful by just using time.Timer.C // This turns out to be really fiddly/painful by just using time.Timer.C
// directly in the code below since you can't detect when a timer is stopped // directly in the code below since you can't detect when a timer is stopped
// vs waiting in order to know to reset it. So just use a chan to send // vs waiting in order to know to reset it. So just use a chan to send
@ -282,6 +300,15 @@ func (s *state) run() {
} }
func (s *state) handleUpdate(u cache.UpdateEvent, snap *ConfigSnapshot) error { func (s *state) handleUpdate(u cache.UpdateEvent, snap *ConfigSnapshot) error {
switch s.kind {
case structs.ServiceKindConnectProxy:
return s.handleUpdateConnectProxy(u, snap)
default:
return fmt.Errorf("Unsupported service kind")
}
}
func (s *state) handleUpdateConnectProxy(u cache.UpdateEvent, snap *ConfigSnapshot) error {
switch u.CorrelationID { switch u.CorrelationID {
case rootsWatchID: case rootsWatchID:
roots, ok := u.Result.(*structs.IndexedCARoots) roots, ok := u.Result.(*structs.IndexedCARoots)
@ -340,7 +367,7 @@ func (s *state) Changed(ns *structs.NodeService, token string) bool {
if ns == nil { if ns == nil {
return true return true
} }
return ns.Kind != structs.ServiceKindConnectProxy || return ns.Kind != s.kind ||
s.proxyID != ns.ID || s.proxyID != ns.ID ||
s.address != ns.Address || s.address != ns.Address ||
s.port != ns.Port || s.port != ns.Port ||

View File

@ -19,9 +19,23 @@ import (
"github.com/hashicorp/consul/agent/structs" "github.com/hashicorp/consul/agent/structs"
) )
// clustersFromSnapshot returns the xDS API representation of the "clusters" in the snapshot.
func (s *Server) clustersFromSnapshot(cfgSnap *proxycfg.ConfigSnapshot, token string) ([]proto.Message, error) {
if cfgSnap == nil {
return nil, errors.New("nil config given")
}
switch cfgSnap.Kind {
case structs.ServiceKindConnectProxy:
return s.clustersFromSnapshotConnectProxy(cfgSnap, token)
default:
return nil, fmt.Errorf("Invalid service kind: %v", cfgSnap.Kind)
}
}
// clustersFromSnapshot returns the xDS API representation of the "clusters" // clustersFromSnapshot returns the xDS API representation of the "clusters"
// (upstreams) in the snapshot. // (upstreams) in the snapshot.
func (s *Server) clustersFromSnapshot(cfgSnap *proxycfg.ConfigSnapshot, token string) ([]proto.Message, error) { func (s *Server) clustersFromSnapshotConnectProxy(cfgSnap *proxycfg.ConfigSnapshot, token string) ([]proto.Message, error) {
if cfgSnap == nil { if cfgSnap == nil {
return nil, errors.New("nil config given") return nil, errors.New("nil config given")
} }

View File

@ -2,6 +2,7 @@ package xds
import ( import (
"errors" "errors"
"fmt"
envoy "github.com/envoyproxy/go-control-plane/envoy/api/v2" envoy "github.com/envoyproxy/go-control-plane/envoy/api/v2"
envoycore "github.com/envoyproxy/go-control-plane/envoy/api/v2/core" envoycore "github.com/envoyproxy/go-control-plane/envoy/api/v2/core"
@ -14,11 +15,25 @@ import (
) )
// endpointsFromSnapshot returns the xDS API representation of the "endpoints" // endpointsFromSnapshot returns the xDS API representation of the "endpoints"
// (upstream instances) in the snapshot.
func endpointsFromSnapshot(cfgSnap *proxycfg.ConfigSnapshot, token string) ([]proto.Message, error) { func endpointsFromSnapshot(cfgSnap *proxycfg.ConfigSnapshot, token string) ([]proto.Message, error) {
if cfgSnap == nil { if cfgSnap == nil {
return nil, errors.New("nil config given") return nil, errors.New("nil config given")
} }
switch cfgSnap.Kind {
case structs.ServiceKindConnectProxy:
return endpointsFromSnapshotConnectProxy(cfgSnap, token)
default:
return nil, fmt.Errorf("Invalid service kind: %v", cfgSnap.Kind)
}
}
// endpointsFromSnapshotConnectProxy returns the xDS API representation of the "endpoints"
// (upstream instances) in the snapshot.
func endpointsFromSnapshotConnectProxy(cfgSnap *proxycfg.ConfigSnapshot, token string) ([]proto.Message, error) {
if cfgSnap == nil {
return nil, errors.New("nil config given")
}
resources := make([]proto.Message, 0, len(cfgSnap.UpstreamEndpoints)) resources := make([]proto.Message, 0, len(cfgSnap.UpstreamEndpoints))
for id, endpoints := range cfgSnap.UpstreamEndpoints { for id, endpoints := range cfgSnap.UpstreamEndpoints {
la := makeLoadAssignment(id, endpoints) la := makeLoadAssignment(id, endpoints)

View File

@ -24,13 +24,27 @@ import (
"github.com/hashicorp/consul/agent/structs" "github.com/hashicorp/consul/agent/structs"
) )
// listenersFromSnapshot returns the xDS API representation of the "listeners" // listenersFromSnapshot returns the xDS API representation of the "listeners" in the snapshot.
// in the snapshot.
func (s *Server) listenersFromSnapshot(cfgSnap *proxycfg.ConfigSnapshot, token string) ([]proto.Message, error) { func (s *Server) listenersFromSnapshot(cfgSnap *proxycfg.ConfigSnapshot, token string) ([]proto.Message, error) {
if cfgSnap == nil { if cfgSnap == nil {
return nil, errors.New("nil config given") return nil, errors.New("nil config given")
} }
switch cfgSnap.Kind {
case structs.ServiceKindConnectProxy:
return s.listenersFromSnapshotConnectProxy(cfgSnap, token)
default:
return nil, fmt.Errorf("Invalid service kind: %v", cfgSnap.Kind)
}
}
// listenersFromSnapshotConnectProxy returns the xDS API representation of the "listeners"
// in the snapshot.
func (s *Server) listenersFromSnapshotConnectProxy(cfgSnap *proxycfg.ConfigSnapshot, token string) ([]proto.Message, error) {
if cfgSnap == nil {
return nil, errors.New("nil config given")
}
// One listener for each upstream plus the public one // One listener for each upstream plus the public one
resources := make([]proto.Message, len(cfgSnap.Proxy.Upstreams)+1) resources := make([]proto.Message, len(cfgSnap.Proxy.Upstreams)+1)

View File

@ -235,9 +235,14 @@ func (s *Server) process(stream ADSStream, reqCh <-chan *envoy.DiscoveryRequest)
return err return err
} }
switch cfgSnap.Kind {
case structs.ServiceKindConnectProxy:
if rule != nil && !rule.ServiceWrite(cfgSnap.Proxy.DestinationServiceName, nil) { if rule != nil && !rule.ServiceWrite(cfgSnap.Proxy.DestinationServiceName, nil) {
return status.Errorf(codes.PermissionDenied, "permission denied") return status.Errorf(codes.PermissionDenied, "permission denied")
} }
default:
return status.Errorf(codes.Internal, "Invalid service kind")
}
// Authed OK! // Authed OK!
return nil return nil