diff --git a/command/agent/agent.go b/command/agent/agent.go index 406e9ed068..b192705f16 100644 --- a/command/agent/agent.go +++ b/command/agent/agent.go @@ -141,6 +141,10 @@ type Agent struct { shutdownCh chan struct{} shutdownLock sync.Mutex + // retryJoinCh transports errors from the retry join + // attempts. + retryJoinCh chan error + // endpoints lets you override RPC endpoints for testing. Not all // agent methods use this, so use with care and never override // outside of a unit test. @@ -195,6 +199,7 @@ func NewAgent(c *Config) (*Agent, error) { eventCh: make(chan serf.UserEvent, 1024), eventBuf: make([]*UserEvent, 256), reloadCh: make(chan chan error), + retryJoinCh: make(chan error), shutdownCh: make(chan struct{}), endpoints: make(map[string]string), dnsAddrs: dnsAddrs, @@ -303,6 +308,11 @@ func (a *Agent) Start() error { } a.httpServers = append(a.httpServers, srv) } + + // start retry join + go a.retryJoin() + go a.retryJoinWan() + return nil } @@ -1127,6 +1137,12 @@ func (a *Agent) ReloadCh() chan chan error { return a.reloadCh } +// RetryJoinCh is a channel that transports errors +// from the retry join process. +func (a *Agent) RetryJoinCh() <-chan error { + return a.retryJoinCh +} + // ShutdownCh is used to return a channel that can be // selected to wait for the agent to perform a shutdown. func (a *Agent) ShutdownCh() <-chan struct{} { diff --git a/command/agent/command.go b/command/agent/command.go index 67999a3d9e..8a3cf74ef7 100644 --- a/command/agent/command.go +++ b/command/agent/command.go @@ -501,100 +501,6 @@ func (cmd *Command) startupJoinWan(cfg *Config) error { return nil } -// retryJoin is used to handle retrying a join until it succeeds or all -// retries are exhausted. -func (cmd *Command) retryJoin(cfg *Config, errCh chan<- struct{}) { - ec2Enabled := cfg.RetryJoinEC2.TagKey != "" && cfg.RetryJoinEC2.TagValue != "" - gceEnabled := cfg.RetryJoinGCE.TagValue != "" - azureEnabled := cfg.RetryJoinAzure.TagName != "" && cfg.RetryJoinAzure.TagValue != "" - - if len(cfg.RetryJoin) == 0 && !ec2Enabled && !gceEnabled && !azureEnabled { - return - } - - logger := cmd.agent.logger - logger.Printf("[INFO] agent: Joining cluster...") - - attempt := 0 - for { - var servers []string - var err error - switch { - case ec2Enabled: - servers, err = cfg.discoverEc2Hosts(logger) - if err != nil { - logger.Printf("[ERROR] agent: Unable to query EC2 instances: %s", err) - } - logger.Printf("[INFO] agent: Discovered %d servers from EC2", len(servers)) - case gceEnabled: - servers, err = cfg.discoverGCEHosts(logger) - if err != nil { - logger.Printf("[ERROR] agent: Unable to query GCE instances: %s", err) - } - logger.Printf("[INFO] agent: Discovered %d servers from GCE", len(servers)) - case azureEnabled: - servers, err = cfg.discoverAzureHosts(logger) - if err != nil { - logger.Printf("[ERROR] agent: Unable to query Azure instances: %s", err) - } - logger.Printf("[INFO] agent: Discovered %d servers from Azure", len(servers)) - } - - servers = append(servers, cfg.RetryJoin...) - if len(servers) == 0 { - err = fmt.Errorf("No servers to join") - } else { - n, err := cmd.agent.JoinLAN(servers) - if err == nil { - logger.Printf("[INFO] agent: Join completed. Synced with %d initial agents", n) - return - } - } - - attempt++ - if cfg.RetryMaxAttempts > 0 && attempt > cfg.RetryMaxAttempts { - logger.Printf("[ERROR] agent: max join retry exhausted, exiting") - close(errCh) - return - } - - logger.Printf("[WARN] agent: Join failed: %v, retrying in %v", err, - cfg.RetryInterval) - time.Sleep(cfg.RetryInterval) - } -} - -// retryJoinWan is used to handle retrying a join -wan until it succeeds or all -// retries are exhausted. -func (cmd *Command) retryJoinWan(cfg *Config, errCh chan<- struct{}) { - if len(cfg.RetryJoinWan) == 0 { - return - } - - logger := cmd.agent.logger - logger.Printf("[INFO] agent: Joining WAN cluster...") - - attempt := 0 - for { - n, err := cmd.agent.JoinWAN(cfg.RetryJoinWan) - if err == nil { - logger.Printf("[INFO] agent: Join -wan completed. Synced with %d initial agents", n) - return - } - - attempt++ - if cfg.RetryMaxAttemptsWan > 0 && attempt > cfg.RetryMaxAttemptsWan { - logger.Printf("[ERROR] agent: max join -wan retry exhausted, exiting") - close(errCh) - return - } - - logger.Printf("[WARN] agent: Join -wan failed: %v, retrying in %v", err, - cfg.RetryIntervalWan) - time.Sleep(cfg.RetryIntervalWan) - } -} - func (cmd *Command) Run(args []string) int { cmd.UI = &cli.PrefixedUi{ OutputPrefix: "==> ", @@ -819,20 +725,12 @@ func (cmd *Command) Run(args []string) int { cmd.UI.Output("Log data will now stream in as it occurs:\n") logGate.Flush() - // Start retry join process - errCh := make(chan struct{}) - go cmd.retryJoin(config, errCh) - - // Start retry -wan join process - errWanCh := make(chan struct{}) - go cmd.retryJoinWan(config, errWanCh) - // Wait for exit - return cmd.handleSignals(config, errCh, errWanCh) + return cmd.handleSignals(config) } // handleSignals blocks until we get an exit-causing signal -func (cmd *Command) handleSignals(cfg *Config, retryJoin <-chan struct{}, retryJoinWan <-chan struct{}) int { +func (cmd *Command) handleSignals(cfg *Config) int { signalCh := make(chan os.Signal, 4) signal.Notify(signalCh, os.Interrupt, syscall.SIGTERM, syscall.SIGHUP) signal.Notify(signalCh, os.Interrupt, syscall.SIGTERM, syscall.SIGHUP, syscall.SIGPIPE) @@ -844,14 +742,13 @@ WAIT: select { case s := <-signalCh: sig = s - case ch := <-cmd.agent.reloadCh: + case ch := <-cmd.agent.ReloadCh(): sig = syscall.SIGHUP reloadErrCh = ch case <-cmd.ShutdownCh: sig = os.Interrupt - case <-retryJoin: - return 1 - case <-retryJoinWan: + case err := <-cmd.agent.RetryJoinCh(): + cmd.UI.Error(err.Error()) return 1 case <-cmd.agent.ShutdownCh(): // Agent is already shutdown! diff --git a/command/agent/retry_join.go b/command/agent/retry_join.go new file mode 100644 index 0000000000..a2d68559c1 --- /dev/null +++ b/command/agent/retry_join.go @@ -0,0 +1,97 @@ +package agent + +import ( + "fmt" + "time" +) + +// RetryJoin is used to handle retrying a join until it succeeds or all +// retries are exhausted. +func (a *Agent) retryJoin() { + cfg := a.config + + ec2Enabled := cfg.RetryJoinEC2.TagKey != "" && cfg.RetryJoinEC2.TagValue != "" + gceEnabled := cfg.RetryJoinGCE.TagValue != "" + azureEnabled := cfg.RetryJoinAzure.TagName != "" && cfg.RetryJoinAzure.TagValue != "" + + if len(cfg.RetryJoin) == 0 && !ec2Enabled && !gceEnabled && !azureEnabled { + return + } + + a.logger.Printf("[INFO] agent: Joining cluster...") + attempt := 0 + for { + var servers []string + var err error + switch { + case ec2Enabled: + servers, err = cfg.discoverEc2Hosts(a.logger) + if err != nil { + a.logger.Printf("[ERROR] agent: Unable to query EC2 instances: %s", err) + } + a.logger.Printf("[INFO] agent: Discovered %d servers from EC2", len(servers)) + case gceEnabled: + servers, err = cfg.discoverGCEHosts(a.logger) + if err != nil { + a.logger.Printf("[ERROR] agent: Unable to query GCE instances: %s", err) + } + a.logger.Printf("[INFO] agent: Discovered %d servers from GCE", len(servers)) + case azureEnabled: + servers, err = cfg.discoverAzureHosts(a.logger) + if err != nil { + a.logger.Printf("[ERROR] agent: Unable to query Azure instances: %s", err) + } + a.logger.Printf("[INFO] agent: Discovered %d servers from Azure", len(servers)) + } + + servers = append(servers, cfg.RetryJoin...) + if len(servers) == 0 { + err = fmt.Errorf("No servers to join") + } else { + n, err := a.JoinLAN(servers) + if err == nil { + a.logger.Printf("[INFO] agent: Join completed. Synced with %d initial agents", n) + return + } + } + + attempt++ + if cfg.RetryMaxAttempts > 0 && attempt > cfg.RetryMaxAttempts { + a.retryJoinCh <- fmt.Errorf("agent: max join retry exhausted, exiting") + return + } + + a.logger.Printf("[WARN] agent: Join failed: %v, retrying in %v", err, cfg.RetryInterval) + time.Sleep(cfg.RetryInterval) + } +} + +// RetryJoinWan is used to handle retrying a join -wan until it succeeds or all +// retries are exhausted. +func (a *Agent) retryJoinWan() { + cfg := a.config + + if len(cfg.RetryJoinWan) == 0 { + return + } + + a.logger.Printf("[INFO] agent: Joining WAN cluster...") + + attempt := 0 + for { + n, err := a.JoinWAN(cfg.RetryJoinWan) + if err == nil { + a.logger.Printf("[INFO] agent: Join -wan completed. Synced with %d initial agents", n) + return + } + + attempt++ + if cfg.RetryMaxAttemptsWan > 0 && attempt > cfg.RetryMaxAttemptsWan { + a.retryJoinCh <- fmt.Errorf("agent: max join -wan retry exhausted, exiting") + return + } + + a.logger.Printf("[WARN] agent: Join -wan failed: %v, retrying in %v", err, cfg.RetryIntervalWan) + time.Sleep(cfg.RetryIntervalWan) + } +}