mirror of https://github.com/hashicorp/consul
You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
424 lines
12 KiB
424 lines
12 KiB
package autoconf |
|
|
|
import ( |
|
"context" |
|
"fmt" |
|
"os" |
|
"sync" |
|
"time" |
|
|
|
"github.com/hashicorp/go-hclog" |
|
|
|
"github.com/hashicorp/consul/agent/cache" |
|
"github.com/hashicorp/consul/agent/config" |
|
"github.com/hashicorp/consul/agent/token" |
|
"github.com/hashicorp/consul/lib/retry" |
|
"github.com/hashicorp/consul/logging" |
|
"github.com/hashicorp/consul/proto/pbautoconf" |
|
) |
|
|
|
// AutoConfig is all the state necessary for being able to parse a configuration |
|
// as well as perform the necessary RPCs to perform Agent Auto Configuration. |
|
type AutoConfig struct { |
|
sync.Mutex |
|
|
|
acConfig Config |
|
logger hclog.Logger |
|
cache Cache |
|
waiter *retry.Waiter |
|
config *config.RuntimeConfig |
|
autoConfigResponse *pbautoconf.AutoConfigResponse |
|
autoConfigSource config.Source |
|
|
|
running bool |
|
done chan struct{} |
|
// cancel is used to cancel the entire AutoConfig |
|
// go routine. This is the main field protected |
|
// by the mutex as it being non-nil indicates that |
|
// the go routine has been started and is stoppable. |
|
// note that it doesn't indcate that the go routine |
|
// is currently running. |
|
cancel context.CancelFunc |
|
|
|
// cancelWatches is used to cancel the existing |
|
// cache watches regarding the agents certificate. This is |
|
// mainly only necessary when the Agent token changes. |
|
cancelWatches context.CancelFunc |
|
|
|
// cacheUpdates is the chan used to have the cache |
|
// send us back events |
|
cacheUpdates chan cache.UpdateEvent |
|
|
|
// tokenUpdates is the struct used to receive |
|
// events from the token store when the Agent |
|
// token is updated. |
|
tokenUpdates token.Notifier |
|
} |
|
|
|
// New creates a new AutoConfig object for providing automatic Consul configuration. |
|
func New(config Config) (*AutoConfig, error) { |
|
switch { |
|
case config.Loader == nil: |
|
return nil, fmt.Errorf("must provide a config loader") |
|
case config.DirectRPC == nil: |
|
return nil, fmt.Errorf("must provide a direct RPC delegate") |
|
case config.Cache == nil: |
|
return nil, fmt.Errorf("must provide a cache") |
|
case config.TLSConfigurator == nil: |
|
return nil, fmt.Errorf("must provide a TLS configurator") |
|
case config.Tokens == nil: |
|
return nil, fmt.Errorf("must provide a token store") |
|
} |
|
|
|
if config.FallbackLeeway == 0 { |
|
config.FallbackLeeway = 10 * time.Second |
|
} |
|
if config.FallbackRetry == 0 { |
|
config.FallbackRetry = time.Minute |
|
} |
|
|
|
logger := config.Logger |
|
if logger == nil { |
|
logger = hclog.NewNullLogger() |
|
} else { |
|
logger = logger.Named(logging.AutoConfig) |
|
} |
|
|
|
if config.Waiter == nil { |
|
config.Waiter = &retry.Waiter{ |
|
MinFailures: 1, |
|
MaxWait: 10 * time.Minute, |
|
Jitter: retry.NewJitter(25), |
|
} |
|
} |
|
|
|
if err := config.EnterpriseConfig.validateAndFinalize(); err != nil { |
|
return nil, err |
|
} |
|
|
|
return &AutoConfig{ |
|
acConfig: config, |
|
logger: logger, |
|
}, nil |
|
} |
|
|
|
// ReadConfig will parse the current configuration and inject any |
|
// auto-config sources if present into the correct place in the parsing chain. |
|
func (ac *AutoConfig) ReadConfig() (*config.RuntimeConfig, error) { |
|
ac.Lock() |
|
defer ac.Unlock() |
|
result, err := ac.acConfig.Loader(ac.autoConfigSource) |
|
if err != nil { |
|
return result.RuntimeConfig, err |
|
} |
|
|
|
for _, w := range result.Warnings { |
|
ac.logger.Warn(w) |
|
} |
|
|
|
ac.config = result.RuntimeConfig |
|
return ac.config, nil |
|
} |
|
|
|
// InitialConfiguration will perform a one-time RPC request to the configured servers |
|
// to retrieve various cluster wide configurations. See the proto/pbautoconf/auto_config.proto |
|
// file for a complete reference of what configurations can be applied in this manner. |
|
// The returned configuration will be the new configuration with any auto-config settings |
|
// already applied. If AutoConfig is not enabled this method will just parse any |
|
// local configuration and return the built runtime configuration. |
|
// |
|
// The context passed in can be used to cancel the retrieval of the initial configuration |
|
// like when receiving a signal during startup. |
|
func (ac *AutoConfig) InitialConfiguration(ctx context.Context) (*config.RuntimeConfig, error) { |
|
if err := ac.maybeLoadConfig(); err != nil { |
|
return nil, err |
|
} |
|
|
|
switch { |
|
case ac.config.AutoConfig.Enabled: |
|
resp, err := ac.readPersistedAutoConfig() |
|
if err != nil { |
|
return nil, err |
|
} |
|
|
|
if resp == nil { |
|
ac.logger.Info("retrieving initial agent auto configuration remotely") |
|
resp, err = ac.getInitialConfiguration(ctx) |
|
if err != nil { |
|
return nil, err |
|
} |
|
} |
|
|
|
ac.logger.Debug("updating auto-config settings") |
|
if err = ac.recordInitialConfiguration(resp); err != nil { |
|
return nil, err |
|
} |
|
|
|
// re-read the configuration now that we have our initial auto-config |
|
config, err := ac.ReadConfig() |
|
if err != nil { |
|
return nil, err |
|
} |
|
|
|
ac.config = config |
|
return ac.config, nil |
|
case ac.config.AutoEncryptTLS: |
|
certs, err := ac.autoEncryptInitialCerts(ctx) |
|
if err != nil { |
|
return nil, err |
|
} |
|
|
|
if err := ac.setInitialTLSCertificates(certs); err != nil { |
|
return nil, err |
|
} |
|
|
|
ac.logger.Info("automatically upgraded to TLS") |
|
return ac.config, nil |
|
default: |
|
return ac.config, nil |
|
} |
|
} |
|
|
|
// maybeLoadConfig will read the Consul configuration using the |
|
// provided config loader if and only if the config field of |
|
// the struct is nil. When it does this it will fill in that |
|
// field. If the config field already is non-nil then this |
|
// is a noop. |
|
func (ac *AutoConfig) maybeLoadConfig() error { |
|
if ac.config == nil { |
|
config, err := ac.ReadConfig() |
|
if err != nil { |
|
return err |
|
} |
|
|
|
ac.config = config |
|
} |
|
return nil |
|
} |
|
|
|
// introToken is responsible for determining the correct intro token to use |
|
// when making the initial AutoConfig.InitialConfiguration RPC request. |
|
func (ac *AutoConfig) introToken() (string, error) { |
|
conf := ac.config.AutoConfig |
|
// without an intro token or intro token file we cannot do anything |
|
if conf.IntroToken == "" && conf.IntroTokenFile == "" { |
|
return "", fmt.Errorf("neither intro_token or intro_token_file settings are not configured") |
|
} |
|
|
|
token := conf.IntroToken |
|
if token == "" { |
|
// load the intro token from the file |
|
content, err := os.ReadFile(conf.IntroTokenFile) |
|
if err != nil { |
|
return "", fmt.Errorf("Failed to read intro token from file: %w", err) |
|
} |
|
|
|
token = string(content) |
|
|
|
if token == "" { |
|
return "", fmt.Errorf("intro_token_file did not contain any token") |
|
} |
|
} |
|
|
|
return token, nil |
|
} |
|
|
|
// recordInitialConfiguration is responsible for recording the AutoConfigResponse from |
|
// the AutoConfig.InitialConfiguration RPC. It is an all-in-one function to do the following |
|
// - update the Agent token in the token store |
|
func (ac *AutoConfig) recordInitialConfiguration(resp *pbautoconf.AutoConfigResponse) error { |
|
ac.autoConfigResponse = resp |
|
|
|
ac.autoConfigSource = config.LiteralSource{ |
|
Name: autoConfigFileName, |
|
Config: translateConfig(resp.Config), |
|
} |
|
|
|
// we need to re-read the configuration to determine what the correct ACL |
|
// token to push into the token store is. Any user provided token will override |
|
// any AutoConfig generated token. |
|
config, err := ac.ReadConfig() |
|
if err != nil { |
|
return fmt.Errorf("failed to fully resolve configuration: %w", err) |
|
} |
|
|
|
// ignoring the return value which would indicate a change in the token |
|
_ = ac.acConfig.Tokens.UpdateAgentToken(config.ACLTokens.ACLAgentToken, token.TokenSourceConfig) |
|
|
|
// extra a structs.SignedResponse from the AutoConfigResponse for use in cache prepopulation |
|
signed, err := extractSignedResponse(resp) |
|
if err != nil { |
|
return fmt.Errorf("failed to extract certificates from the auto-config response: %w", err) |
|
} |
|
|
|
// prepopulate the cache |
|
if err = ac.populateCertificateCache(signed); err != nil { |
|
return fmt.Errorf("failed to populate the cache with certificate responses: %w", err) |
|
} |
|
|
|
// update the TLS configurator with the latest certificates |
|
if err := ac.updateTLSFromResponse(resp); err != nil { |
|
return err |
|
} |
|
|
|
return ac.persistAutoConfig(resp) |
|
} |
|
|
|
// getInitialConfigurationOnce will perform full server to TCPAddr resolution and |
|
// loop through each host trying to make the AutoConfig.InitialConfiguration RPC call. When |
|
// successful the bool return will be true and the err value will indicate whether we |
|
// successfully recorded the auto config settings (persisted to disk and stored internally |
|
// on the AutoConfig object) |
|
func (ac *AutoConfig) getInitialConfigurationOnce(ctx context.Context, csr string, key string) (*pbautoconf.AutoConfigResponse, error) { |
|
token, err := ac.introToken() |
|
if err != nil { |
|
return nil, err |
|
} |
|
|
|
request := pbautoconf.AutoConfigRequest{ |
|
Datacenter: ac.config.Datacenter, |
|
Node: ac.config.NodeName, |
|
Segment: ac.config.SegmentName, |
|
Partition: ac.config.PartitionOrEmpty(), |
|
JWT: token, |
|
CSR: csr, |
|
} |
|
|
|
var resp pbautoconf.AutoConfigResponse |
|
|
|
servers, err := ac.autoConfigHosts() |
|
if err != nil { |
|
return nil, err |
|
} |
|
|
|
for _, s := range servers { |
|
// try each IP to see if we can successfully make the request |
|
for _, addr := range ac.resolveHost(s) { |
|
if ctx.Err() != nil { |
|
return nil, ctx.Err() |
|
} |
|
|
|
ac.logger.Debug("making AutoConfig.InitialConfiguration RPC", "addr", addr.String()) |
|
if err = ac.acConfig.DirectRPC.RPC(ac.config.Datacenter, ac.config.NodeName, &addr, "AutoConfig.InitialConfiguration", &request, &resp); err != nil { |
|
ac.logger.Error("AutoConfig.InitialConfiguration RPC failed", "addr", addr.String(), "error", err) |
|
continue |
|
} |
|
ac.logger.Debug("AutoConfig.InitialConfiguration RPC was successful") |
|
|
|
// update the Certificate with the private key we generated locally |
|
if resp.Certificate != nil { |
|
resp.Certificate.PrivateKeyPEM = key |
|
} |
|
|
|
return &resp, nil |
|
} |
|
} |
|
|
|
return nil, fmt.Errorf("No server successfully responded to the auto-config request") |
|
} |
|
|
|
// getInitialConfiguration implements a loop to retry calls to getInitialConfigurationOnce. |
|
// It uses the RetryWaiter on the AutoConfig object to control how often to attempt |
|
// the initial configuration process. It is also canceallable by cancelling the provided context. |
|
func (ac *AutoConfig) getInitialConfiguration(ctx context.Context) (*pbautoconf.AutoConfigResponse, error) { |
|
// generate a CSR |
|
csr, key, err := ac.generateCSR() |
|
if err != nil { |
|
return nil, err |
|
} |
|
|
|
ac.acConfig.Waiter.Reset() |
|
for { |
|
resp, err := ac.getInitialConfigurationOnce(ctx, csr, key) |
|
switch { |
|
case err == nil && resp != nil: |
|
return resp, nil |
|
case err != nil: |
|
ac.logger.Error(err.Error()) |
|
default: |
|
ac.logger.Error("No error returned when fetching configuration from the servers but no response was either") |
|
} |
|
|
|
if err := ac.acConfig.Waiter.Wait(ctx); err != nil { |
|
ac.logger.Info("interrupted during initial auto configuration", "err", err) |
|
return nil, err |
|
} |
|
} |
|
} |
|
|
|
func (ac *AutoConfig) Start(ctx context.Context) error { |
|
ac.Lock() |
|
defer ac.Unlock() |
|
|
|
if !ac.config.AutoConfig.Enabled && !ac.config.AutoEncryptTLS { |
|
return nil |
|
} |
|
|
|
if ac.running || ac.cancel != nil { |
|
return fmt.Errorf("AutoConfig is already running") |
|
} |
|
|
|
// create the top level context to control the go |
|
// routine executing the `run` method |
|
ctx, cancel := context.WithCancel(ctx) |
|
|
|
// create the channel to get cache update events through |
|
// really we should only ever get 10 updates |
|
ac.cacheUpdates = make(chan cache.UpdateEvent, 10) |
|
|
|
// setup the cache watches |
|
cancelCertWatches, err := ac.setupCertificateCacheWatches(ctx) |
|
if err != nil { |
|
cancel() |
|
return fmt.Errorf("error setting up cache watches: %w", err) |
|
} |
|
|
|
// start the token update notifier |
|
ac.tokenUpdates = ac.acConfig.Tokens.Notify(token.TokenKindAgent) |
|
|
|
// store the cancel funcs |
|
ac.cancel = cancel |
|
ac.cancelWatches = cancelCertWatches |
|
|
|
ac.running = true |
|
ac.done = make(chan struct{}) |
|
go ac.run(ctx, ac.done) |
|
|
|
ac.logger.Info("auto-config started") |
|
return nil |
|
} |
|
|
|
func (ac *AutoConfig) Done() <-chan struct{} { |
|
ac.Lock() |
|
defer ac.Unlock() |
|
|
|
if ac.done != nil { |
|
return ac.done |
|
} |
|
|
|
// return a closed channel to indicate that we are already done |
|
done := make(chan struct{}) |
|
close(done) |
|
return done |
|
} |
|
|
|
func (ac *AutoConfig) IsRunning() bool { |
|
ac.Lock() |
|
defer ac.Unlock() |
|
return ac.running |
|
} |
|
|
|
func (ac *AutoConfig) Stop() bool { |
|
ac.Lock() |
|
defer ac.Unlock() |
|
|
|
if !ac.running { |
|
return false |
|
} |
|
|
|
if ac.cancel != nil { |
|
ac.cancel() |
|
} |
|
|
|
return true |
|
}
|
|
|