Browse Source

Merge pull request #477 from amalaviy/retry_wan_join

Add start-wan-join, retry-wan-join and related configuration and commandline options
pull/480/head
Armon Dadgar 10 years ago
parent
commit
3b3f082283
  1. 88
      command/agent/command.go
  2. 83
      command/agent/command_test.go
  3. 44
      command/agent/config.go
  4. 63
      command/agent/config_test.go
  5. 24
      website/source/docs/agent/options.html.markdown

88
command/agent/command.go

@ -53,6 +53,7 @@ func (c *Command) readConfig() *Config {
var cmdConfig Config
var configFiles []string
var retryInterval string
var retryIntervalWan string
cmdFlags := flag.NewFlagSet("agent", flag.ContinueOnError)
cmdFlags.Usage = func() { c.Ui.Output(c.Help()) }
@ -83,12 +84,20 @@ func (c *Command) readConfig() *Config {
"enable re-joining after a previous leave")
cmdFlags.Var((*AppendSliceValue)(&cmdConfig.StartJoin), "join",
"address of agent to join on startup")
cmdFlags.Var((*AppendSliceValue)(&cmdConfig.StartJoinWan), "join-wan",
"address of agent to join -wan on startup")
cmdFlags.Var((*AppendSliceValue)(&cmdConfig.RetryJoin), "retry-join",
"address of agent to join on startup with retry")
cmdFlags.IntVar(&cmdConfig.RetryMaxAttempts, "retry-max", 0,
"number of retries for joining")
cmdFlags.StringVar(&retryInterval, "retry-interval", "",
"interval between join attempts")
cmdFlags.Var((*AppendSliceValue)(&cmdConfig.RetryJoinWan), "retry-join-wan",
"address of agent to join -wan on startup with retry")
cmdFlags.IntVar(&cmdConfig.RetryMaxAttemptsWan, "retry-max-wan", 0,
"number of retries for joining -wan")
cmdFlags.StringVar(&retryIntervalWan, "retry-interval-wan", "",
"interval between join -wan attempts")
if err := cmdFlags.Parse(c.args); err != nil {
return nil
@ -103,6 +112,15 @@ func (c *Command) readConfig() *Config {
cmdConfig.RetryInterval = dur
}
if retryIntervalWan != "" {
dur, err := time.ParseDuration(retryIntervalWan)
if err != nil {
c.Ui.Error(fmt.Sprintf("Error: %s", err))
return nil
}
cmdConfig.RetryIntervalWan = dur
}
config := DefaultConfig()
if len(configFiles) > 0 {
fileConfig, err := ReadConfigPaths(configFiles)
@ -369,6 +387,22 @@ func (c *Command) startupJoin(config *Config) error {
return nil
}
// startupJoinWan is invoked to handle any joins -wan specified to take place at start time
func (c *Command) startupJoinWan(config *Config) error {
if len(config.StartJoinWan) == 0 {
return nil
}
c.Ui.Output("Joining -wan cluster...")
n, err := c.agent.JoinWAN(config.StartJoinWan)
if err != nil {
return err
}
c.Ui.Info(fmt.Sprintf("Join -wan completed. Synced with %d initial agents", n))
return nil
}
// retryJoin is used to handle retrying a join until it succeeds or all
// retries are exhausted.
func (c *Command) retryJoin(config *Config, errCh chan<- struct{}) {
@ -400,6 +434,37 @@ func (c *Command) retryJoin(config *Config, errCh chan<- struct{}) {
}
}
// retryJoinWan is used to handle retrying a join -wan until it succeeds or all
// retries are exhausted.
func (c *Command) retryJoinWan(config *Config, errCh chan<- struct{}) {
if len(config.RetryJoinWan) == 0 {
return
}
logger := c.agent.logger
logger.Printf("[INFO] agent: Joining WAN cluster...")
attempt := 0
for {
n, err := c.agent.JoinWAN(config.RetryJoinWan)
if err == nil {
logger.Printf("[INFO] agent: Join -wan completed. Synced with %d initial agents", n)
return
}
attempt++
if config.RetryMaxAttemptsWan > 0 && attempt > config.RetryMaxAttemptsWan {
logger.Printf("[ERROR] agent: max join -wan retry exhausted, exiting")
close(errCh)
return
}
logger.Printf("[WARN] agent: Join -wan failed: %v, retrying in %v", err,
config.RetryIntervalWan)
time.Sleep(config.RetryIntervalWan)
}
}
func (c *Command) Run(args []string) int {
c.Ui = &cli.PrefixedUi{
OutputPrefix: "==> ",
@ -482,6 +547,12 @@ func (c *Command) Run(args []string) int {
return 1
}
// Join startup nodes if specified
if err := c.startupJoinWan(config); err != nil {
c.Ui.Error(err.Error())
return 1
}
// Register the services
for _, service := range config.Services {
ns := service.NodeService()
@ -542,12 +613,16 @@ func (c *Command) Run(args []string) int {
errCh := make(chan struct{})
go c.retryJoin(config, errCh)
// Start retry -wan join process
errWanCh := make(chan struct{})
go c.retryJoinWan(config, errWanCh)
// Wait for exit
return c.handleSignals(config, errCh)
return c.handleSignals(config, errCh, errWanCh)
}
// handleSignals blocks until we get an exit-causing signal
func (c *Command) handleSignals(config *Config, retryJoin <-chan struct{}) int {
func (c *Command) handleSignals(config *Config, retryJoin <-chan struct{}, retryJoinWan <-chan struct{}) int {
signalCh := make(chan os.Signal, 4)
signal.Notify(signalCh, os.Interrupt, syscall.SIGTERM, syscall.SIGHUP)
@ -563,6 +638,8 @@ WAIT:
sig = os.Interrupt
case <-retryJoin:
return 1
case <-retryJoinWan:
return 1
case <-c.agent.ShutdownCh():
// Agent is already shutdown!
return 0
@ -721,11 +798,18 @@ Options:
-encrypt=key Provides the gossip encryption key
-join=1.2.3.4 Address of an agent to join at start time.
Can be specified multiple times.
-join-wan=1.2.3.4 Address of an agent to join -wan at start time.
Can be specified multiple times.
-retry-join=1.2.3.4 Address of an agent to join at start time with
retries enabled. Can be specified multiple times.
-retry-interval=30s Time to wait between join attempts.
-retry-max=0 Maximum number of join attempts. Defaults to 0, which
will retry indefinitely.
-retry-join-wan=1.2.3.4 Address of an agent to join -wan at start time with
retries enabled. Can be specified multiple times.
-retry-interval-wan=30s Time to wait between join -wan attempts.
-retry-max-wan=0 Maximum number of join -wan attempts. Defaults to 0, which
will retry indefinitely.
-log-level=info Log level of the agent.
-node=hostname Name of this node. Must be unique in the cluster
-protocol=N Sets the protocol version. Defaults to latest.

83
command/agent/command_test.go

@ -121,3 +121,86 @@ func TestRetryJoinFail(t *testing.T) {
t.Fatalf("bad: %d", code)
}
}
func TestRetryJoinWan(t *testing.T) {
dir, agent := makeAgent(t, nextConfig())
defer os.RemoveAll(dir)
defer agent.Shutdown()
conf2 := nextConfig()
tmpDir, err := ioutil.TempDir("", "consul")
if err != nil {
t.Fatalf("err: %s", err)
}
defer os.RemoveAll(tmpDir)
doneCh := make(chan struct{})
shutdownCh := make(chan struct{})
defer func() {
close(shutdownCh)
<-doneCh
}()
cmd := &Command{
ShutdownCh: shutdownCh,
Ui: new(cli.MockUi),
}
serfAddr := fmt.Sprintf(
"%s:%d",
agent.config.BindAddr,
agent.config.Ports.SerfLan)
args := []string{
"-data-dir", tmpDir,
"-node", fmt.Sprintf(`"%s"`, conf2.NodeName),
"-retry-join-wan", serfAddr,
"-retry-interval", "1s",
}
go func() {
if code := cmd.Run(args); code != 0 {
log.Printf("bad: %d", code)
}
close(doneCh)
}()
testutil.WaitForResult(func() (bool, error) {
mem := agent.WANMembers()
if len(mem) != 2 {
return false, fmt.Errorf("bad: %#v", mem)
}
return true, nil
}, func(err error) {
t.Fatalf(err.Error())
})
}
func TestRetryJoinWanFail(t *testing.T) {
conf := nextConfig()
tmpDir, err := ioutil.TempDir("", "consul")
if err != nil {
t.Fatalf("err: %s", err)
}
defer os.RemoveAll(tmpDir)
shutdownCh := make(chan struct{})
defer close(shutdownCh)
cmd := &Command{
ShutdownCh: shutdownCh,
Ui: new(cli.MockUi),
}
serfAddr := fmt.Sprintf("%s:%d", conf.BindAddr, conf.Ports.SerfWan)
args := []string{
"-data-dir", tmpDir,
"-retry-join-wan", serfAddr,
"-retry-max", "1",
}
if code := cmd.Run(args); code == 0 {
t.Fatalf("bad: %d", code)
}
}

44
command/agent/config.go

@ -194,6 +194,11 @@ type Config struct {
// addresses, then the agent will error and exit.
StartJoin []string `mapstructure:"start_join"`
// StartJoinWan is a list of addresses to attempt to join -wan when the
// agent starts. If Serf is unable to communicate with any of these
// addresses, then the agent will error and exit.
StartJoinWan []string `mapstructure:"start_join_wan"`
// RetryJoin is a list of addresses to join with retry enabled.
RetryJoin []string `mapstructure:"retry_join"`
@ -208,6 +213,20 @@ type Config struct {
RetryInterval time.Duration `mapstructure:"-" json:"-"`
RetryIntervalRaw string `mapstructure:"retry_interval"`
// RetryJoinWan is a list of addresses to join -wan with retry enabled.
RetryJoinWan []string `mapstructure:"retry_join_wan"`
// RetryMaxAttemptsWan specifies the maximum number of times to retry joining a
// -wan host on startup. This is useful for cases where we know the node will be
// online eventually.
RetryMaxAttemptsWan int `mapstructure:"retry_max_wan"`
// RetryIntervalWan specifies the amount of time to wait in between join
// -wan attempts on agent start. The minimum allowed value is 1 second and
// the default is 30s.
RetryIntervalWan time.Duration `mapstructure:"-" json:"-"`
RetryIntervalWanRaw string `mapstructure:"retry_interval_wan"`
// UiDir is the directory containing the Web UI resources.
// If provided, the UI endpoints will be enabled.
UiDir string `mapstructure:"ui_dir"`
@ -348,6 +367,7 @@ func DefaultConfig() *Config {
ACLDownPolicy: "extend-cache",
ACLDefaultPolicy: "allow",
RetryInterval: 30 * time.Second,
RetryIntervalWan: 30 * time.Second,
}
}
@ -505,6 +525,14 @@ func DecodeConfig(r io.Reader) (*Config, error) {
result.RetryInterval = dur
}
if raw := result.RetryIntervalWanRaw; raw != "" {
dur, err := time.ParseDuration(raw)
if err != nil {
return nil, fmt.Errorf("RetryIntervalWan invalid: %v", err)
}
result.RetryIntervalWan = dur
}
// Merge the single recursor
if result.DNSRecursor != "" {
result.DNSRecursors = append(result.DNSRecursors, result.DNSRecursor)
@ -750,6 +778,12 @@ func MergeConfig(a, b *Config) *Config {
if b.RetryInterval != 0 {
result.RetryInterval = b.RetryInterval
}
if b.RetryMaxAttemptsWan != 0 {
result.RetryMaxAttemptsWan = b.RetryMaxAttemptsWan
}
if b.RetryIntervalWan != 0 {
result.RetryIntervalWan = b.RetryIntervalWan
}
if b.DNSConfig.NodeTTL != 0 {
result.DNSConfig.NodeTTL = b.DNSConfig.NodeTTL
}
@ -816,11 +850,21 @@ func MergeConfig(a, b *Config) *Config {
result.StartJoin = append(result.StartJoin, a.StartJoin...)
result.StartJoin = append(result.StartJoin, b.StartJoin...)
// Copy the start join addresses
result.StartJoinWan = make([]string, 0, len(a.StartJoinWan)+len(b.StartJoinWan))
result.StartJoinWan = append(result.StartJoinWan, a.StartJoinWan...)
result.StartJoinWan = append(result.StartJoinWan, b.StartJoinWan...)
// Copy the retry join addresses
result.RetryJoin = make([]string, 0, len(a.RetryJoin)+len(b.RetryJoin))
result.RetryJoin = append(result.RetryJoin, a.RetryJoin...)
result.RetryJoin = append(result.RetryJoin, b.RetryJoin...)
// Copy the retry join -wan addresses
result.RetryJoinWan = make([]string, 0, len(a.RetryJoinWan)+len(b.RetryJoinWan))
result.RetryJoinWan = append(result.RetryJoinWan, a.RetryJoinWan...)
result.RetryJoinWan = append(result.RetryJoinWan, b.RetryJoinWan...)
return &result
}

63
command/agent/config_test.go

@ -274,6 +274,23 @@ func TestDecodeConfig(t *testing.T) {
t.Fatalf("bad: %#v", config)
}
// Start Join wan
input = `{"start_join_wan": ["1.1.1.1", "2.2.2.2"]}`
config, err = DecodeConfig(bytes.NewReader([]byte(input)))
if err != nil {
t.Fatalf("err: %s", err)
}
if len(config.StartJoinWan) != 2 {
t.Fatalf("bad: %#v", config)
}
if config.StartJoinWan[0] != "1.1.1.1" {
t.Fatalf("bad: %#v", config)
}
if config.StartJoinWan[1] != "2.2.2.2" {
t.Fatalf("bad: %#v", config)
}
// Retry join
input = `{"retry_join": ["1.1.1.1", "2.2.2.2"]}`
config, err = DecodeConfig(bytes.NewReader([]byte(input)))
@ -316,6 +333,48 @@ func TestDecodeConfig(t *testing.T) {
t.Fatalf("bad: %#v", config)
}
// Retry Join wan
input = `{"retry_join_wan": ["1.1.1.1", "2.2.2.2"]}`
config, err = DecodeConfig(bytes.NewReader([]byte(input)))
if err != nil {
t.Fatalf("err: %s", err)
}
if len(config.RetryJoinWan) != 2 {
t.Fatalf("bad: %#v", config)
}
if config.RetryJoinWan[0] != "1.1.1.1" {
t.Fatalf("bad: %#v", config)
}
if config.RetryJoinWan[1] != "2.2.2.2" {
t.Fatalf("bad: %#v", config)
}
// Retry Interval wan
input = `{"retry_interval_wan": "10s"}`
config, err = DecodeConfig(bytes.NewReader([]byte(input)))
if err != nil {
t.Fatalf("err: %s", err)
}
if config.RetryIntervalWanRaw != "10s" {
t.Fatalf("bad: %#v", config)
}
if config.RetryIntervalWan.String() != "10s" {
t.Fatalf("bad: %#v", config)
}
// Retry Max wan
input = `{"retry_max_wan": 3}`
config, err = DecodeConfig(bytes.NewReader([]byte(input)))
if err != nil {
t.Fatalf("err: %s", err)
}
if config.RetryMaxAttemptsWan != 3 {
t.Fatalf("bad: %#v", config)
}
// UI Dir
input = `{"ui_dir": "/opt/consul-ui"}`
config, err = DecodeConfig(bytes.NewReader([]byte(input)))
@ -860,12 +919,16 @@ func TestMergeConfig(t *testing.T) {
Checks: []*CheckDefinition{nil},
Services: []*ServiceDefinition{nil},
StartJoin: []string{"1.1.1.1"},
StartJoinWan: []string{"1.1.1.1"},
UiDir: "/opt/consul-ui",
EnableSyslog: true,
RejoinAfterLeave: true,
RetryJoin: []string{"1.1.1.1"},
RetryIntervalRaw: "10s",
RetryInterval: 10 * time.Second,
RetryJoinWan: []string{"1.1.1.1"},
RetryIntervalWanRaw: "10s",
RetryIntervalWan: 10 * time.Second,
CheckUpdateInterval: 8 * time.Minute,
CheckUpdateIntervalRaw: "8m",
ACLToken: "1234",

24
website/source/docs/agent/options.html.markdown

@ -105,6 +105,21 @@ The options below are all specified on the command-line.
with return code 1. By default, this is set to 0, which will continue to
retry the join indefinitely.
* `-join-wan` - Address of another wan agent to join upon starting up. This can be
specified multiple times to specify multiple agents that are on the WAN to join. If Consul is
unable to join with any of the specified addresses, agent startup will
fail. By default, the agent won't join -wan any nodes when it starts up.
* `-retry-join-wan` - Similar to `retry-join`, but allows retrying a wan join if the first
attempt fails. This is useful for cases where we know the address will become
available eventually.
* `-retry-interval-wan` - Time to wait between join -wan attempts. Defaults to 30s.
* `-retry-max-wan` - The maximum number of join -wan attempts to be made before exiting
with return code 1. By default, this is set to 0, which will continue to
retry the join -wan indefinitely.
* `-log-level` - The level of logging to show after the Consul agent has
started. This defaults to "info". The available log levels are "trace",
"debug", "info", "warn", "err". This is the log level that will be shown
@ -339,6 +354,12 @@ definitions support being updated during a reload.
* `retry_interval` - Equivalent to the `-retry-interval` command-line flag.
* `retry_join_wan` - Equivalent to the `-retry-join-wan` command-line flag. Takes a list
of addresses to attempt joining to WAN every `retry_interval_wan` until at least one
join -wan works.
* `retry_interval_wan` - Equivalent to the `-retry-interval-wan` command-line flag.
* `server` - Equivalent to the `-server` command-line flag.
* `server_name` - When give, this overrides the `node_name` for the TLS certificate.
@ -353,6 +374,9 @@ definitions support being updated during a reload.
* `start_join` - An array of strings specifying addresses of nodes to
join upon startup.
* `start_join_wan` - An array of strings specifying addresses of WAN nodes to
join -wan upon startup.
* `statsd_addr` - This provides the address of a statsd instance. If provided
Consul will send various telemetry information to that instance for aggregation.
This can be used to capture various runtime information. This sends UDP packets

Loading…
Cancel
Save