diff --git a/command/agent/command.go b/command/agent/command.go index 6aa51ac7c8..768d337c32 100644 --- a/command/agent/command.go +++ b/command/agent/command.go @@ -53,6 +53,7 @@ func (c *Command) readConfig() *Config { var cmdConfig Config var configFiles []string var retryInterval string + var retryWanInterval string cmdFlags := flag.NewFlagSet("agent", flag.ContinueOnError) cmdFlags.Usage = func() { c.Ui.Output(c.Help()) } @@ -83,12 +84,20 @@ func (c *Command) readConfig() *Config { "enable re-joining after a previous leave") cmdFlags.Var((*AppendSliceValue)(&cmdConfig.StartJoin), "join", "address of agent to join on startup") + cmdFlags.Var((*AppendSliceValue)(&cmdConfig.StartWanJoin), "join-wan", + "address of agent to join -wan on startup") cmdFlags.Var((*AppendSliceValue)(&cmdConfig.RetryJoin), "retry-join", "address of agent to join on startup with retry") cmdFlags.IntVar(&cmdConfig.RetryMaxAttempts, "retry-max", 0, "number of retries for joining") cmdFlags.StringVar(&retryInterval, "retry-interval", "", "interval between join attempts") + cmdFlags.Var((*AppendSliceValue)(&cmdConfig.RetryWanJoin), "retry-wan-join", + "address of agent to join -wan on startup with retry") + cmdFlags.IntVar(&cmdConfig.RetryWanMaxAttempts, "retry-wan-max", 0, + "number of retries for joining -wan") + cmdFlags.StringVar(&retryWanInterval, "retry-wan-interval", "", + "interval between join -wan attempts") if err := cmdFlags.Parse(c.args); err != nil { return nil @@ -103,6 +112,15 @@ func (c *Command) readConfig() *Config { cmdConfig.RetryInterval = dur } + if retryWanInterval != "" { + dur, err := time.ParseDuration(retryWanInterval) + if err != nil { + c.Ui.Error(fmt.Sprintf("Error: %s", err)) + return nil + } + cmdConfig.RetryWanInterval = dur + } + config := DefaultConfig() if len(configFiles) > 0 { fileConfig, err := ReadConfigPaths(configFiles) @@ -369,6 +387,22 @@ func (c *Command) startupJoin(config *Config) error { return nil } +// startupWanJoin is invoked to handle any joins -wan specified to take place at start time +func (c *Command) startupWanJoin(config *Config) error { + if len(config.StartWanJoin) == 0 { + return nil + } + + c.Ui.Output("Joining -wan cluster...") + n, err := c.agent.JoinWAN(config.StartWanJoin) + if err != nil { + return err + } + + c.Ui.Info(fmt.Sprintf("Join -wan completed. Synced with %d initial agents", n)) + return nil +} + // retryJoin is used to handle retrying a join until it succeeds or all // retries are exhausted. func (c *Command) retryJoin(config *Config, errCh chan<- struct{}) { @@ -400,6 +434,37 @@ func (c *Command) retryJoin(config *Config, errCh chan<- struct{}) { } } +// retryWanJoin is used to handle retrying a join -wan until it succeeds or all +// retries are exhausted. +func (c *Command) retryWanJoin(config *Config, errCh chan<- struct{}) { + if len(config.RetryWanJoin) == 0 { + return + } + + logger := c.agent.logger + logger.Printf("[INFO] agent: Joining WAN cluster...") + + attempt := 0 + for { + n, err := c.agent.JoinWAN(config.RetryWanJoin) + if err == nil { + logger.Printf("[INFO] agent: Join -wan completed. Synced with %d initial agents", n) + return + } + + attempt++ + if config.RetryWanMaxAttempts > 0 && attempt > config.RetryWanMaxAttempts { + logger.Printf("[ERROR] agent: max join -wan retry exhausted, exiting") + close(errCh) + return + } + + logger.Printf("[WARN] agent: Join -wan failed: %v, retrying in %v", err, + config.RetryWanInterval) + time.Sleep(config.RetryWanInterval) + } +} + func (c *Command) Run(args []string) int { c.Ui = &cli.PrefixedUi{ OutputPrefix: "==> ", @@ -482,6 +547,12 @@ func (c *Command) Run(args []string) int { return 1 } + // Join startup nodes if specified + if err := c.startupWanJoin(config); err != nil { + c.Ui.Error(err.Error()) + return 1 + } + // Register the services for _, service := range config.Services { ns := service.NodeService() @@ -542,12 +613,16 @@ func (c *Command) Run(args []string) int { errCh := make(chan struct{}) go c.retryJoin(config, errCh) + // Start retry -wan join process + errWanCh := make(chan struct{}) + go c.retryWanJoin(config, errWanCh) + // Wait for exit - return c.handleSignals(config, errCh) + return c.handleSignals(config, errCh, errWanCh) } // handleSignals blocks until we get an exit-causing signal -func (c *Command) handleSignals(config *Config, retryJoin <-chan struct{}) int { +func (c *Command) handleSignals(config *Config, retryJoin <-chan struct{}, retryWanJoin <-chan struct{}) int { signalCh := make(chan os.Signal, 4) signal.Notify(signalCh, os.Interrupt, syscall.SIGTERM, syscall.SIGHUP) @@ -563,6 +638,8 @@ WAIT: sig = os.Interrupt case <-retryJoin: return 1 + case <-retryWanJoin: + return 1 case <-c.agent.ShutdownCh(): // Agent is already shutdown! return 0 @@ -721,11 +798,18 @@ Options: -encrypt=key Provides the gossip encryption key -join=1.2.3.4 Address of an agent to join at start time. Can be specified multiple times. + -join-wan=1.2.3.4 Address of an agent to join -wan at start time. + Can be specified multiple times. -retry-join=1.2.3.4 Address of an agent to join at start time with retries enabled. Can be specified multiple times. -retry-interval=30s Time to wait between join attempts. -retry-max=0 Maximum number of join attempts. Defaults to 0, which will retry indefinitely. + -retry-wan-join=1.2.3.4 Address of an agent to join -wan at start time with + retries enabled. Can be specified multiple times. + -retry-wan-interval=30s Time to wait between join -wan attempts. + -retry-wan-max=0 Maximum number of join -wan attempts. Defaults to 0, which + will retry indefinitely. -log-level=info Log level of the agent. -node=hostname Name of this node. Must be unique in the cluster -protocol=N Sets the protocol version. Defaults to latest. diff --git a/command/agent/command_test.go b/command/agent/command_test.go index 81934a1ae6..e2dd634d57 100644 --- a/command/agent/command_test.go +++ b/command/agent/command_test.go @@ -121,3 +121,86 @@ func TestRetryJoinFail(t *testing.T) { t.Fatalf("bad: %d", code) } } +func TestRetryWanJoin(t *testing.T) { + dir, agent := makeAgent(t, nextConfig()) + defer os.RemoveAll(dir) + defer agent.Shutdown() + + conf2 := nextConfig() + tmpDir, err := ioutil.TempDir("", "consul") + if err != nil { + t.Fatalf("err: %s", err) + } + defer os.RemoveAll(tmpDir) + + doneCh := make(chan struct{}) + shutdownCh := make(chan struct{}) + + defer func() { + close(shutdownCh) + <-doneCh + }() + + cmd := &Command{ + ShutdownCh: shutdownCh, + Ui: new(cli.MockUi), + } + + serfAddr := fmt.Sprintf( + "%s:%d", + agent.config.BindAddr, + agent.config.Ports.SerfLan) + + args := []string{ + "-data-dir", tmpDir, + "-node", fmt.Sprintf(`"%s"`, conf2.NodeName), + "-retry-wan-join", serfAddr, + "-retry-interval", "1s", + } + + go func() { + if code := cmd.Run(args); code != 0 { + log.Printf("bad: %d", code) + } + close(doneCh) + }() + + testutil.WaitForResult(func() (bool, error) { + mem := agent.WANMembers() + if len(mem) != 2 { + return false, fmt.Errorf("bad: %#v", mem) + } + return true, nil + }, func(err error) { + t.Fatalf(err.Error()) + }) +} + +func TestRetryWanJoinFail(t *testing.T) { + conf := nextConfig() + tmpDir, err := ioutil.TempDir("", "consul") + if err != nil { + t.Fatalf("err: %s", err) + } + defer os.RemoveAll(tmpDir) + + shutdownCh := make(chan struct{}) + defer close(shutdownCh) + + cmd := &Command{ + ShutdownCh: shutdownCh, + Ui: new(cli.MockUi), + } + + serfAddr := fmt.Sprintf("%s:%d", conf.BindAddr, conf.Ports.SerfWan) + + args := []string{ + "-data-dir", tmpDir, + "-retry-wan-join", serfAddr, + "-retry-max", "1", + } + + if code := cmd.Run(args); code == 0 { + t.Fatalf("bad: %d", code) + } +} diff --git a/command/agent/config.go b/command/agent/config.go index 3a926499f2..ddff17164d 100644 --- a/command/agent/config.go +++ b/command/agent/config.go @@ -194,6 +194,11 @@ type Config struct { // addresses, then the agent will error and exit. StartJoin []string `mapstructure:"start_join"` + // StartWanJoin is a list of addresses to attempt to join -wan when the + // agent starts. If Serf is unable to communicate with any of these + // addresses, then the agent will error and exit. + StartWanJoin []string `mapstructure:"start_wan_join"` + // RetryJoin is a list of addresses to join with retry enabled. RetryJoin []string `mapstructure:"retry_join"` @@ -208,6 +213,20 @@ type Config struct { RetryInterval time.Duration `mapstructure:"-" json:"-"` RetryIntervalRaw string `mapstructure:"retry_interval"` + // RetryWanJoin is a list of addresses to join -wan with retry enabled. + RetryWanJoin []string `mapstructure:"retry_wan_join"` + + // RetryWanMaxAttempts specifies the maximum number of times to retry joining a + // -wan host on startup. This is useful for cases where we know the node will be + // online eventually. + RetryWanMaxAttempts int `mapstructure:"retry_wan_max"` + + // RetryWanInterval specifies the amount of time to wait in between join + // -wan attempts on agent start. The minimum allowed value is 1 second and + // the default is 30s. + RetryWanInterval time.Duration `mapstructure:"-" json:"-"` + RetryWanIntervalRaw string `mapstructure:"retry_wan_interval"` + // UiDir is the directory containing the Web UI resources. // If provided, the UI endpoints will be enabled. UiDir string `mapstructure:"ui_dir"` @@ -348,6 +367,7 @@ func DefaultConfig() *Config { ACLDownPolicy: "extend-cache", ACLDefaultPolicy: "allow", RetryInterval: 30 * time.Second, + RetryWanInterval: 30 * time.Second, } } @@ -505,6 +525,14 @@ func DecodeConfig(r io.Reader) (*Config, error) { result.RetryInterval = dur } + if raw := result.RetryWanIntervalRaw; raw != "" { + dur, err := time.ParseDuration(raw) + if err != nil { + return nil, fmt.Errorf("RetryWanInterval invalid: %v", err) + } + result.RetryWanInterval = dur + } + // Merge the single recursor if result.DNSRecursor != "" { result.DNSRecursors = append(result.DNSRecursors, result.DNSRecursor) @@ -750,6 +778,12 @@ func MergeConfig(a, b *Config) *Config { if b.RetryInterval != 0 { result.RetryInterval = b.RetryInterval } + if b.RetryWanMaxAttempts != 0 { + result.RetryWanMaxAttempts = b.RetryWanMaxAttempts + } + if b.RetryWanInterval != 0 { + result.RetryWanInterval = b.RetryWanInterval + } if b.DNSConfig.NodeTTL != 0 { result.DNSConfig.NodeTTL = b.DNSConfig.NodeTTL } @@ -816,11 +850,21 @@ func MergeConfig(a, b *Config) *Config { result.StartJoin = append(result.StartJoin, a.StartJoin...) result.StartJoin = append(result.StartJoin, b.StartJoin...) + // Copy the start join addresses + result.StartWanJoin = make([]string, 0, len(a.StartWanJoin)+len(b.StartWanJoin)) + result.StartWanJoin = append(result.StartWanJoin, a.StartWanJoin...) + result.StartWanJoin = append(result.StartWanJoin, b.StartWanJoin...) + // Copy the retry join addresses result.RetryJoin = make([]string, 0, len(a.RetryJoin)+len(b.RetryJoin)) result.RetryJoin = append(result.RetryJoin, a.RetryJoin...) result.RetryJoin = append(result.RetryJoin, b.RetryJoin...) + // Copy the retry join -wan addresses + result.RetryWanJoin = make([]string, 0, len(a.RetryWanJoin)+len(b.RetryWanJoin)) + result.RetryWanJoin = append(result.RetryWanJoin, a.RetryWanJoin...) + result.RetryWanJoin = append(result.RetryWanJoin, b.RetryWanJoin...) + return &result } diff --git a/command/agent/config_test.go b/command/agent/config_test.go index 77e67a1728..420c6c4a6d 100644 --- a/command/agent/config_test.go +++ b/command/agent/config_test.go @@ -274,6 +274,23 @@ func TestDecodeConfig(t *testing.T) { t.Fatalf("bad: %#v", config) } + // Start Wan join + input = `{"start_wan_join": ["1.1.1.1", "2.2.2.2"]}` + config, err = DecodeConfig(bytes.NewReader([]byte(input))) + if err != nil { + t.Fatalf("err: %s", err) + } + + if len(config.StartWanJoin) != 2 { + t.Fatalf("bad: %#v", config) + } + if config.StartWanJoin[0] != "1.1.1.1" { + t.Fatalf("bad: %#v", config) + } + if config.StartWanJoin[1] != "2.2.2.2" { + t.Fatalf("bad: %#v", config) + } + // Retry join input = `{"retry_join": ["1.1.1.1", "2.2.2.2"]}` config, err = DecodeConfig(bytes.NewReader([]byte(input))) @@ -316,6 +333,48 @@ func TestDecodeConfig(t *testing.T) { t.Fatalf("bad: %#v", config) } + // Retry WAN join + input = `{"retry_wan_join": ["1.1.1.1", "2.2.2.2"]}` + config, err = DecodeConfig(bytes.NewReader([]byte(input))) + if err != nil { + t.Fatalf("err: %s", err) + } + + if len(config.RetryWanJoin) != 2 { + t.Fatalf("bad: %#v", config) + } + if config.RetryWanJoin[0] != "1.1.1.1" { + t.Fatalf("bad: %#v", config) + } + if config.RetryWanJoin[1] != "2.2.2.2" { + t.Fatalf("bad: %#v", config) + } + + // Retry WAN interval + input = `{"retry_wan_interval": "10s"}` + config, err = DecodeConfig(bytes.NewReader([]byte(input))) + if err != nil { + t.Fatalf("err: %s", err) + } + + if config.RetryWanIntervalRaw != "10s" { + t.Fatalf("bad: %#v", config) + } + if config.RetryWanInterval.String() != "10s" { + t.Fatalf("bad: %#v", config) + } + + // Retry WAN Max + input = `{"retry_wan_max": 3}` + config, err = DecodeConfig(bytes.NewReader([]byte(input))) + if err != nil { + t.Fatalf("err: %s", err) + } + + if config.RetryWanMaxAttempts != 3 { + t.Fatalf("bad: %#v", config) + } + // UI Dir input = `{"ui_dir": "/opt/consul-ui"}` config, err = DecodeConfig(bytes.NewReader([]byte(input))) @@ -860,12 +919,16 @@ func TestMergeConfig(t *testing.T) { Checks: []*CheckDefinition{nil}, Services: []*ServiceDefinition{nil}, StartJoin: []string{"1.1.1.1"}, + StartWanJoin: []string{"1.1.1.1"}, UiDir: "/opt/consul-ui", EnableSyslog: true, RejoinAfterLeave: true, RetryJoin: []string{"1.1.1.1"}, RetryIntervalRaw: "10s", RetryInterval: 10 * time.Second, + RetryWanJoin: []string{"1.1.1.1"}, + RetryWanIntervalRaw: "10s", + RetryWanInterval: 10 * time.Second, CheckUpdateInterval: 8 * time.Minute, CheckUpdateIntervalRaw: "8m", ACLToken: "1234", diff --git a/website/source/docs/agent/options.html.markdown b/website/source/docs/agent/options.html.markdown index de3bbd78a7..bf817f0852 100644 --- a/website/source/docs/agent/options.html.markdown +++ b/website/source/docs/agent/options.html.markdown @@ -105,6 +105,21 @@ The options below are all specified on the command-line. with return code 1. By default, this is set to 0, which will continue to retry the join indefinitely. +* `-join-wan` - Address of another wan agent to join upon starting up. This can be + specified multiple times to specify multiple agents that are on the WAN to join. If Consul is + unable to join with any of the specified addresses, agent startup will + fail. By default, the agent won't join -wan any nodes when it starts up. + +* `-retry-wan-join` - Similar to `retry-join`, but allows retrying a wan join if the first + attempt fails. This is useful for cases where we know the address will become + available eventually. + +* `-retry-wan-interval` - Time to wait between join -wan attempts. Defaults to 30s. + +* `-retry-wan-max` - The maximum number of join -wan attempts to be made before exiting + with return code 1. By default, this is set to 0, which will continue to + retry the join -wan indefinitely. + * `-log-level` - The level of logging to show after the Consul agent has started. This defaults to "info". The available log levels are "trace", "debug", "info", "warn", "err". This is the log level that will be shown @@ -339,6 +354,12 @@ definitions support being updated during a reload. * `retry_interval` - Equivalent to the `-retry-interval` command-line flag. +* `retry_wan_join` - Equivalent to the `-retry-wan-join` command-line flag. Takes a list + of addresses to attempt joining to WAN every `retry_wan_interval` until at least one + join -wan works. + +* `retry_wan_interval` - Equivalent to the `-retry-wan-interval` command-line flag. + * `server` - Equivalent to the `-server` command-line flag. * `server_name` - When give, this overrides the `node_name` for the TLS certificate. @@ -353,6 +374,9 @@ definitions support being updated during a reload. * `start_join` - An array of strings specifying addresses of nodes to join upon startup. +* `start_wan_join` - An array of strings specifying addresses of WAN nodes to + join -wan upon startup. + * `statsd_addr` - This provides the address of a statsd instance. If provided Consul will send various telemetry information to that instance for aggregation. This can be used to capture various runtime information. This sends UDP packets