You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
consul/agent/proxycfg/manager.go

358 lines
10 KiB

// Copyright (c) HashiCorp, Inc.
// SPDX-License-Identifier: BUSL-1.1
package proxycfg
import (
"errors"
"github.com/hashicorp/consul/lib/channels"
"runtime/debug"
"sync"
"github.com/hashicorp/go-hclog"
"golang.org/x/time/rate"
"github.com/hashicorp/consul/agent/structs"
proxysnapshot "github.com/hashicorp/consul/internal/mesh/proxy-snapshot"
"github.com/hashicorp/consul/tlsutil"
)
// ProxyID is a handle on a proxy service instance being tracked by Manager.
type ProxyID struct {
structs.ServiceID
// NodeName identifies the node to which the proxy is registered.
NodeName string
// Token is used to track watches on the same proxy with different ACL tokens
// separately, to prevent accidental security bugs.
//
// Note: this can be different to the ACL token used for authorization that is
// passed to Register (e.g. agent-local services are registered ahead-of-time
// with a token that may be different to the one presented in the xDS stream).
Token string
}
// ProxySource identifies where a proxy service tracked by Manager came from,
// such as the agent's local state or the catalog. It's used to prevent sources
// from overwriting each other's registrations.
type ProxySource string
// Manager provides an API with which proxy services can be registered, and
// coordinates the fetching (and refreshing) of intentions, upstreams, discovery
// chain, certificates etc.
//
// Consumers such as the xDS server can then subscribe to receive snapshots of
// this data whenever it changes.
//
// See package docs for more detail.
type Manager struct {
ManagerConfig
rateLimiter *rate.Limiter
mu sync.Mutex
proxies map[ProxyID]*state
watchers map[ProxyID]map[uint64]chan proxysnapshot.ProxySnapshot
maxWatchID uint64
}
// ManagerConfig holds the required external dependencies for a Manager
// instance. All fields must be set to something valid or the manager will
// panic. The ManagerConfig is passed by value to NewManager so the passed value
// can be mutated safely.
type ManagerConfig struct {
// DataSources contains the dependencies used to consume data used to configure
// proxies.
DataSources DataSources
// source describes the current agent's identity, it's used directly for
// prepared query discovery but also indirectly as a way to pass current
// Datacenter name into other request types that need it. This is sufficient
// for now and cleaner than passing the entire RuntimeConfig.
Source *structs.QuerySource
// DNSConfig is the agent's relevant DNS config for any proxies.
DNSConfig DNSConfig
// logger is the agent's logger to be used for logging logs.
Logger hclog.Logger
TLSConfigurator *tlsutil.Configurator
// IntentionDefaultAllow is set by the agent so that we can pass this
// information to proxies that need to make intention decisions on their
// own.
IntentionDefaultAllow bool
// UpdateRateLimit controls the rate at which config snapshots are delivered
// when updates are received from data sources. This enables us to reduce the
// impact of updates to "global" resources (e.g. proxy-defaults and wildcard
// intentions) that could otherwise saturate system resources, and cause Raft
// or gossip instability.
//
// Defaults to rate.Inf (no rate limit).
UpdateRateLimit rate.Limit
}
// NewManager constructs a Manager.
func NewManager(cfg ManagerConfig) (*Manager, error) {
if cfg.Source == nil || cfg.Logger == nil {
return nil, errors.New("all ManagerConfig fields must be provided")
}
if cfg.UpdateRateLimit == 0 {
cfg.UpdateRateLimit = rate.Inf
}
m := &Manager{
ManagerConfig: cfg,
proxies: make(map[ProxyID]*state),
watchers: make(map[ProxyID]map[uint64]chan proxysnapshot.ProxySnapshot),
rateLimiter: rate.NewLimiter(cfg.UpdateRateLimit, 1),
}
return m, nil
}
// UpdateRateLimit returns the configured update rate limit (see ManagerConfig).
func (m *Manager) UpdateRateLimit() rate.Limit {
return m.rateLimiter.Limit()
}
// SetUpdateRateLimit configures the update rate limit (see ManagerConfig).
func (m *Manager) SetUpdateRateLimit(l rate.Limit) {
m.rateLimiter.SetLimit(l)
}
// RegisteredProxies returns a list of the proxies tracked by Manager, filtered
// by source.
func (m *Manager) RegisteredProxies(source ProxySource) []ProxyID {
m.mu.Lock()
defer m.mu.Unlock()
proxies := make([]ProxyID, 0, len(m.proxies))
for id, state := range m.proxies {
if state.source != source {
continue
}
proxies = append(proxies, id)
}
return proxies
}
// Register and start fetching resources for the given proxy service. If the
// given service was already registered by a different source (e.g. we began
// tracking it from the catalog, but then it was registered to the server
// agent locally) the service will be left as-is unless overwrite is true.
func (m *Manager) Register(id ProxyID, ns *structs.NodeService, source ProxySource, token string, overwrite bool) error {
m.mu.Lock()
defer m.mu.Unlock()
defer func() {
if r := recover(); r != nil {
m.Logger.Error("unexpected panic during service manager registration",
"node", id.NodeName,
"service", id.ServiceID,
"message", r,
"stacktrace", string(debug.Stack()),
)
}
}()
return m.register(id, ns, source, token, overwrite)
}
func (m *Manager) register(id ProxyID, ns *structs.NodeService, source ProxySource, token string, overwrite bool) error {
state, ok := m.proxies[id]
if ok && !state.stoppedRunning() {
if state.source != source && !overwrite {
// Registered by a different source, leave as-is.
return nil
}
if !state.Changed(ns, token) {
// No change
return nil
}
// We are updating the proxy, close its old state
state.Close(false)
}
// TODO: move to a function that translates ManagerConfig->stateConfig
stateConfig := stateConfig{
logger: m.Logger.With("service_id", id.String()),
dataSources: m.DataSources,
source: m.Source,
dnsConfig: m.DNSConfig,
intentionDefaultAllow: m.IntentionDefaultAllow,
}
if m.TLSConfigurator != nil {
stateConfig.serverSNIFn = m.TLSConfigurator.ServerSNI
}
var err error
state, err = newState(id, ns, source, token, stateConfig, m.rateLimiter)
if err != nil {
return err
}
if _, err = state.Watch(); err != nil {
return err
}
m.proxies[id] = state
// Start a goroutine that will wait for changes and broadcast them to watchers.
go m.notifyBroadcast(id, state)
return nil
}
// Deregister the given proxy service, but only if it was registered by the same
// source.
func (m *Manager) Deregister(id ProxyID, source ProxySource) {
m.mu.Lock()
defer m.mu.Unlock()
state, ok := m.proxies[id]
if !ok {
return
}
if state.source != source {
return
}
// Closing state will let the goroutine we started in Register finish since
// watch chan is closed
state.Close(false)
delete(m.proxies, id)
// We intentionally leave potential watchers hanging here - there is no new
// config for them and closing their channels might be indistinguishable from
// an error that they should retry. We rely for them to eventually give up
// (because they are in fact not running any more) and so the watches be
// cleaned up naturally.
}
func (m *Manager) notifyBroadcast(proxyID ProxyID, state *state) {
// Run until ch is closed (by a defer in state.run).
for snap := range state.snapCh {
m.notify(&snap)
}
// If state.run exited because of an irrecoverable error, close all of the
// watchers so that the consumers reconnect/retry at a higher level.
if state.failed() {
m.closeAllWatchers(proxyID)
}
}
func (m *Manager) notify(snap *ConfigSnapshot) {
m.mu.Lock()
defer m.mu.Unlock()
watchers, ok := m.watchers[snap.ProxyID]
if !ok {
return
}
for _, ch := range watchers {
m.deliverLatest(snap, ch)
}
}
// deliverLatest delivers the snapshot to a watch chan. If the delivery blocks,
// it will drain the chan and then re-attempt delivery so that a slow consumer
// gets the latest config earlier. This MUST be called from a method where m.mu
// is held to be safe since it assumes we are the only goroutine sending on ch.
func (m *Manager) deliverLatest(snap proxysnapshot.ProxySnapshot, ch chan proxysnapshot.ProxySnapshot) {
m.Logger.Trace("delivering latest proxy snapshot to proxy", "proxyID", snap.(*ConfigSnapshot).ProxyID)
err := channels.DeliverLatest(snap, ch)
if err != nil {
m.Logger.Error("failed to deliver proxyState to proxy",
"proxy", snap.(*ConfigSnapshot).ProxyID,
)
}
}
// Watch registers a watch on a proxy. It might not exist yet in which case this
// will not fail, but no updates will be delivered until the proxy is
// registered. If there is already a valid snapshot in memory, it will be
// delivered immediately.
func (m *Manager) Watch(id ProxyID) (<-chan proxysnapshot.ProxySnapshot, proxysnapshot.CancelFunc) {
m.mu.Lock()
defer m.mu.Unlock()
// This buffering is crucial otherwise we'd block immediately trying to
// deliver the current snapshot below if we already have one.
ch := make(chan proxysnapshot.ProxySnapshot, 1)
watchers, ok := m.watchers[id]
if !ok {
watchers = make(map[uint64]chan proxysnapshot.ProxySnapshot)
}
watchID := m.maxWatchID
m.maxWatchID++
watchers[watchID] = ch
m.watchers[id] = watchers
// Deliver the current snapshot immediately if there is one ready
if state, ok := m.proxies[id]; ok {
if snap := state.CurrentSnapshot(); snap != nil {
// We rely on ch being buffered above and that it's not been passed
// anywhere so we must be the only writer so this will never block and
// deadlock.
ch <- snap
}
}
return ch, func() {
m.mu.Lock()
defer m.mu.Unlock()
m.closeWatchLocked(id, watchID)
}
}
func (m *Manager) closeAllWatchers(proxyID ProxyID) {
m.mu.Lock()
defer m.mu.Unlock()
watchers, ok := m.watchers[proxyID]
if !ok {
return
}
for watchID := range watchers {
m.closeWatchLocked(proxyID, watchID)
}
}
// closeWatchLocked cleans up state related to a single watcher. It assumes the
// lock is held.
func (m *Manager) closeWatchLocked(proxyID ProxyID, watchID uint64) {
if watchers, ok := m.watchers[proxyID]; ok {
if ch, ok := watchers[watchID]; ok {
delete(watchers, watchID)
close(ch)
if len(watchers) == 0 {
delete(m.watchers, proxyID)
}
}
}
}
// Close removes all state and stops all running goroutines.
func (m *Manager) Close() error {
m.mu.Lock()
defer m.mu.Unlock()
// Close all current watchers first
for proxyID, watchers := range m.watchers {
for watchID := range watchers {
m.closeWatchLocked(proxyID, watchID)
}
}
// Then close all states
for proxyID, state := range m.proxies {
state.Close(false)
delete(m.proxies, proxyID)
}
return nil
}