You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
consul/agent/proxycfg/manager.go

387 lines
12 KiB

package proxycfg
import (
"errors"
"sync"
"github.com/hashicorp/go-hclog"
"github.com/hashicorp/consul/agent/cache"
"github.com/hashicorp/consul/agent/local"
"github.com/hashicorp/consul/agent/structs"
"github.com/hashicorp/consul/agent/token"
"github.com/hashicorp/consul/tlsutil"
)
var (
// ErrStopped is returned from Run if the manager instance has already been
// stopped.
ErrStopped = errors.New("manager stopped")
// ErrStarted is returned from Run if the manager instance has already run.
ErrStarted = errors.New("manager was already run")
)
// CancelFunc is a type for a returned function that can be called to cancel a
// watch.
type CancelFunc func()
// Manager is a component that integrates into the agent and manages Connect
// proxy configuration state. This should not be confused with the deprecated
// "managed proxy" concept where the agent supervises the actual proxy process.
// proxycfg.Manager is oblivious to the distinction and manages state for any
// service registered with Kind == connect-proxy.
//
// The Manager ensures that any Connect proxy registered on the agent has all
// the state it needs cached locally via the agent cache. State includes
// certificates, intentions, and service discovery results for any declared
// upstreams. See package docs for more detail.
type Manager struct {
ManagerConfig
// stateCh is notified for any service changes in local state. We only use
// this to trigger on _new_ service addition since it has no data and we don't
// want to maintain a full copy of the state in order to diff and figure out
// what changed. Luckily each service has it's own WatchCh so we can figure
// out changes and removals with those efficiently.
stateCh chan struct{}
mu sync.Mutex
started bool
proxies map[structs.ServiceID]*state
watchers map[structs.ServiceID]map[uint64]chan *ConfigSnapshot
}
// ManagerConfig holds the required external dependencies for a Manager
// instance. All fields must be set to something valid or the manager will
// panic. The ManagerConfig is passed by value to NewManager so the passed value
// can be mutated safely.
type ManagerConfig struct {
// Cache is the agent's cache instance that can be used to retrieve, store and
// monitor state for the proxies.
Cache *cache.Cache
// Health provides service health updates on a notification channel.
Health Health
// state is the agent's local state to be watched for new proxy registrations.
State *local.State
// source describes the current agent's identity, it's used directly for
// prepared query discovery but also indirectly as a way to pass current
// Datacenter name into other request types that need it. This is sufficient
// for now and cleaner than passing the entire RuntimeConfig.
Source *structs.QuerySource
// DNSConfig is the agent's relevant DNS config for any proxies.
DNSConfig DNSConfig
// logger is the agent's logger to be used for logging logs.
Logger hclog.Logger
TLSConfigurator *tlsutil.Configurator
// Tokens configured on the local agent. Used to look up the agent token if
// a service is registered without a token.
Tokens *token.Store
// IntentionDefaultAllow is set by the agent so that we can pass this
// information to proxies that need to make intention decisions on their
// own.
IntentionDefaultAllow bool
}
// NewManager constructs a manager from the provided agent cache.
func NewManager(cfg ManagerConfig) (*Manager, error) {
if cfg.Cache == nil || cfg.State == nil || cfg.Source == nil ||
cfg.Logger == nil {
return nil, errors.New("all ManagerConfig fields must be provided")
}
m := &Manager{
ManagerConfig: cfg,
// Single item buffer is enough since there is no data transferred so this
// is "level triggering" and we can't miss actual data.
stateCh: make(chan struct{}, 1),
proxies: make(map[structs.ServiceID]*state),
watchers: make(map[structs.ServiceID]map[uint64]chan *ConfigSnapshot),
}
return m, nil
}
// Run is the long-running method that handles state syncing. It should be run
// in it's own goroutine and will continue until a fatal error is hit or Close
// is called. Run will return an error if it is called more than once, or called
// after Close.
func (m *Manager) Run() error {
m.mu.Lock()
alreadyStarted := m.started
m.started = true
stateCh := m.stateCh
m.mu.Unlock()
// Protect against multiple Run calls.
if alreadyStarted {
return ErrStarted
}
// Protect against being run after Close.
if stateCh == nil {
return ErrStopped
}
// Register for notifications about state changes
m.State.Notify(stateCh)
defer m.State.StopNotify(stateCh)
for {
m.syncState(m.notifyBroadcast)
// Wait for a state change
_, ok := <-stateCh
if !ok {
// Stopped
return nil
}
}
}
// syncState is called whenever the local state notifies a change. It holds the
// lock while finding any new or updated proxies and removing deleted ones.
func (m *Manager) syncState(notifyBroadcast func(ch <-chan ConfigSnapshot)) {
m.mu.Lock()
defer m.mu.Unlock()
// Traverse the local state and ensure all proxy services are registered
services := m.State.AllServices()
for sid, svc := range services {
if svc.Kind != structs.ServiceKindConnectProxy &&
svc.Kind != structs.ServiceKindTerminatingGateway &&
svc.Kind != structs.ServiceKindMeshGateway &&
svc.Kind != structs.ServiceKindIngressGateway {
continue
}
// TODO(banks): need to work out when to default some stuff. For example
// Proxy.LocalServicePort is practically necessary for any sidecar and can
// default to the port of the sidecar service, but only if it's already
// registered and once we get past here, we don't have enough context to
// know that so we'd need to set it here if not during registration of the
// proxy service. Sidecar Service in the interim can do that, but we should
// validate more generally that that is always true.
err := m.ensureProxyServiceLocked(svc, notifyBroadcast)
if err != nil {
m.Logger.Error("failed to watch proxy service",
"service", sid.String(),
"error", err,
)
}
}
// Now see if any proxies were removed
for proxyID := range m.proxies {
if _, ok := services[proxyID]; !ok {
// Remove them
m.removeProxyServiceLocked(proxyID)
}
}
}
// ensureProxyServiceLocked adds or changes the proxy to our state.
func (m *Manager) ensureProxyServiceLocked(ns *structs.NodeService, notifyBroadcast func(ch <-chan ConfigSnapshot)) error {
sid := ns.CompoundServiceID()
// Retrieve the token used to register the service, or fallback to the
// default user token. This token is expected to match the token used in
// the xDS request for this data.
token := m.State.ServiceToken(sid)
if token == "" {
token = m.Tokens.UserToken()
}
state, ok := m.proxies[sid]
if ok {
if !state.Changed(ns, token) {
// No change
return nil
}
// We are updating the proxy, close its old state
state.Close()
}
// TODO: move to a function that translates ManagerConfig->stateConfig
stateConfig := stateConfig{
logger: m.Logger.With("service_id", sid.String()),
cache: m.Cache,
health: m.Health,
source: m.Source,
dnsConfig: m.DNSConfig,
intentionDefaultAllow: m.IntentionDefaultAllow,
}
if m.TLSConfigurator != nil {
stateConfig.serverSNIFn = m.TLSConfigurator.ServerSNI
}
var err error
state, err = newState(ns, token, stateConfig)
if err != nil {
return err
}
ch, err := state.Watch()
if err != nil {
return err
}
m.proxies[sid] = state
// Start a goroutine that will wait for changes and broadcast them to watchers.
go notifyBroadcast(ch)
return nil
}
func (m *Manager) notifyBroadcast(ch <-chan ConfigSnapshot) {
// Run until ch is closed
for snap := range ch {
m.notify(&snap)
}
}
// removeProxyService is called when a service deregisters and frees all
// resources for that service.
func (m *Manager) removeProxyServiceLocked(proxyID structs.ServiceID) {
state, ok := m.proxies[proxyID]
if !ok {
return
}
// Closing state will let the goroutine we started in Ensure finish since
// watch chan is closed.
state.Close()
delete(m.proxies, proxyID)
// We intentionally leave potential watchers hanging here - there is no new
// config for them and closing their channels might be indistinguishable from
// an error that they should retry. We rely for them to eventually give up
// (because they are in fact not running any more) and so the watches be
// cleaned up naturally.
}
func (m *Manager) notify(snap *ConfigSnapshot) {
m.mu.Lock()
defer m.mu.Unlock()
watchers, ok := m.watchers[snap.ProxyID]
if !ok {
return
}
for _, ch := range watchers {
m.deliverLatest(snap, ch)
}
}
// deliverLatest delivers the snapshot to a watch chan. If the delivery blocks,
// it will drain the chan and then re-attempt delivery so that a slow consumer
// gets the latest config earlier. This MUST be called from a method where m.mu
// is held to be safe since it assumes we are the only goroutine sending on ch.
func (m *Manager) deliverLatest(snap *ConfigSnapshot, ch chan *ConfigSnapshot) {
// Send if chan is empty
select {
case ch <- snap:
return
default:
}
// Not empty, drain the chan of older snapshots and redeliver. For now we only
// use 1-buffered chans but this will still work if we change that later.
OUTER:
for {
select {
case <-ch:
continue
default:
break OUTER
}
}
// Now send again
select {
case ch <- snap:
return
default:
// This should not be possible since we should be the only sender, enforced
// by m.mu but error and drop the update rather than panic.
m.Logger.Error("failed to deliver ConfigSnapshot to proxy",
"proxy", snap.ProxyID.String(),
)
}
}
// Watch registers a watch on a proxy. It might not exist yet in which case this
// will not fail, but no updates will be delivered until the proxy is
// registered. If there is already a valid snapshot in memory, it will be
// delivered immediately.
func (m *Manager) Watch(proxyID structs.ServiceID) (<-chan *ConfigSnapshot, CancelFunc) {
m.mu.Lock()
defer m.mu.Unlock()
// This buffering is crucial otherwise we'd block immediately trying to
// deliver the current snapshot below if we already have one.
ch := make(chan *ConfigSnapshot, 1)
watchers, ok := m.watchers[proxyID]
if !ok {
watchers = make(map[uint64]chan *ConfigSnapshot)
}
idx := uint64(len(watchers))
watchers[idx] = ch
m.watchers[proxyID] = watchers
// Deliver the current snapshot immediately if there is one ready
if state, ok := m.proxies[proxyID]; ok {
if snap := state.CurrentSnapshot(); snap != nil {
// We rely on ch being buffered above and that it's not been passed
// anywhere so we must be the only writer so this will never block and
// deadlock.
ch <- snap
}
}
return ch, func() {
m.mu.Lock()
defer m.mu.Unlock()
m.closeWatchLocked(proxyID, idx)
}
}
// closeWatchLocked cleans up state related to a single watcher. It assumes the
// lock is held.
func (m *Manager) closeWatchLocked(proxyID structs.ServiceID, watchIdx uint64) {
if watchers, ok := m.watchers[proxyID]; ok {
if ch, ok := watchers[watchIdx]; ok {
delete(watchers, watchIdx)
close(ch)
if len(watchers) == 0 {
delete(m.watchers, proxyID)
}
}
}
}
// Close removes all state and stops all running goroutines.
func (m *Manager) Close() error {
m.mu.Lock()
defer m.mu.Unlock()
if m.stateCh != nil {
close(m.stateCh)
m.stateCh = nil
}
// Close all current watchers first
for proxyID, watchers := range m.watchers {
for idx := range watchers {
m.closeWatchLocked(proxyID, idx)
}
}
// Then close all states
for proxyID, state := range m.proxies {
state.Close()
delete(m.proxies, proxyID)
}
return nil
}