Merge pull request #396 from ryanuber/f-retry-join

agent: Retry failed joins on agent start
pull/397/head
Armon Dadgar 2014-10-13 10:46:10 -07:00
commit bee5fe280c
5 changed files with 241 additions and 3 deletions

View File

@ -52,6 +52,7 @@ type Command struct {
func (c *Command) readConfig() *Config { func (c *Command) readConfig() *Config {
var cmdConfig Config var cmdConfig Config
var configFiles []string var configFiles []string
var retryInterval string
cmdFlags := flag.NewFlagSet("agent", flag.ContinueOnError) cmdFlags := flag.NewFlagSet("agent", flag.ContinueOnError)
cmdFlags.Usage = func() { c.Ui.Output(c.Help()) } cmdFlags.Usage = func() { c.Ui.Output(c.Help()) }
@ -82,11 +83,26 @@ func (c *Command) readConfig() *Config {
"enable re-joining after a previous leave") "enable re-joining after a previous leave")
cmdFlags.Var((*AppendSliceValue)(&cmdConfig.StartJoin), "join", cmdFlags.Var((*AppendSliceValue)(&cmdConfig.StartJoin), "join",
"address of agent to join on startup") "address of agent to join on startup")
cmdFlags.Var((*AppendSliceValue)(&cmdConfig.RetryJoin), "retry-join",
"address of agent to join on startup with retry")
cmdFlags.IntVar(&cmdConfig.RetryMaxAttempts, "retry-max", 0,
"number of retries for joining")
cmdFlags.StringVar(&retryInterval, "retry-interval", "",
"interval between join attempts")
if err := cmdFlags.Parse(c.args); err != nil { if err := cmdFlags.Parse(c.args); err != nil {
return nil return nil
} }
if retryInterval != "" {
dur, err := time.ParseDuration(retryInterval)
if err != nil {
c.Ui.Error(fmt.Sprintf("Error: %s", err))
return nil
}
cmdConfig.RetryInterval = dur
}
config := DefaultConfig() config := DefaultConfig()
if len(configFiles) > 0 { if len(configFiles) > 0 {
fileConfig, err := ReadConfigPaths(configFiles) fileConfig, err := ReadConfigPaths(configFiles)
@ -353,6 +369,37 @@ func (c *Command) startupJoin(config *Config) error {
return nil return nil
} }
// retryJoin is used to handle retrying a join until it succeeds or all
// retries are exhausted.
func (c *Command) retryJoin(config *Config, errCh chan<- struct{}) {
if len(config.RetryJoin) == 0 {
return
}
logger := c.agent.logger
logger.Printf("[INFO] agent: Joining cluster...")
attempt := 0
for {
n, err := c.agent.JoinLAN(config.RetryJoin)
if err == nil {
logger.Printf("[INFO] agent: Join completed. Synced with %d initial agents", n)
return
}
attempt++
if config.RetryMaxAttempts > 0 && attempt > config.RetryMaxAttempts {
logger.Printf("[ERROR] agent: max join retry exhausted, exiting")
close(errCh)
return
}
logger.Printf("[WARN] agent: Join failed: %v, retrying in %v", err,
config.RetryInterval)
time.Sleep(config.RetryInterval)
}
}
func (c *Command) Run(args []string) int { func (c *Command) Run(args []string) int {
c.Ui = &cli.PrefixedUi{ c.Ui = &cli.PrefixedUi{
OutputPrefix: "==> ", OutputPrefix: "==> ",
@ -491,12 +538,16 @@ func (c *Command) Run(args []string) int {
c.Ui.Output("Log data will now stream in as it occurs:\n") c.Ui.Output("Log data will now stream in as it occurs:\n")
logGate.Flush() logGate.Flush()
// Start retry join process
errCh := make(chan struct{})
go c.retryJoin(config, errCh)
// Wait for exit // Wait for exit
return c.handleSignals(config) return c.handleSignals(config, errCh)
} }
// handleSignals blocks until we get an exit-causing signal // handleSignals blocks until we get an exit-causing signal
func (c *Command) handleSignals(config *Config) int { func (c *Command) handleSignals(config *Config, retryJoin <-chan struct{}) int {
signalCh := make(chan os.Signal, 4) signalCh := make(chan os.Signal, 4)
signal.Notify(signalCh, os.Interrupt, syscall.SIGTERM, syscall.SIGHUP) signal.Notify(signalCh, os.Interrupt, syscall.SIGTERM, syscall.SIGHUP)
@ -510,6 +561,8 @@ WAIT:
sig = syscall.SIGHUP sig = syscall.SIGHUP
case <-c.ShutdownCh: case <-c.ShutdownCh:
sig = os.Interrupt sig = os.Interrupt
case <-retryJoin:
return 1
case <-c.agent.ShutdownCh(): case <-c.agent.ShutdownCh():
// Agent is already shutdown! // Agent is already shutdown!
return 0 return 0
@ -668,6 +721,11 @@ Options:
-encrypt=key Provides the gossip encryption key -encrypt=key Provides the gossip encryption key
-join=1.2.3.4 Address of an agent to join at start time. -join=1.2.3.4 Address of an agent to join at start time.
Can be specified multiple times. Can be specified multiple times.
-retry-join=1.2.3.4 Address of an agent to join at start time with
retries enabled. Can be specified multiple times.
-retry-interval=30s Time to wait between join attempts.
-retry-max=0 Maximum number of join attempts. Defaults to 0, which
will retry indefinitely.
-log-level=info Log level of the agent. -log-level=info Log level of the agent.
-node=hostname Name of this node. Must be unique in the cluster -node=hostname Name of this node. Must be unique in the cluster
-protocol=N Sets the protocol version. Defaults to latest. -protocol=N Sets the protocol version. Defaults to latest.

View File

@ -1,8 +1,14 @@
package agent package agent
import ( import (
"github.com/mitchellh/cli" "fmt"
"io/ioutil"
"log"
"os"
"testing" "testing"
"github.com/hashicorp/consul/testutil"
"github.com/mitchellh/cli"
) )
func TestCommand_implements(t *testing.T) { func TestCommand_implements(t *testing.T) {
@ -31,3 +37,87 @@ func TestValidDatacenter(t *testing.T) {
} }
} }
} }
func TestRetryJoin(t *testing.T) {
dir, agent := makeAgent(t, nextConfig())
defer os.RemoveAll(dir)
defer agent.Shutdown()
conf2 := nextConfig()
tmpDir, err := ioutil.TempDir("", "consul")
if err != nil {
t.Fatalf("err: %s", err)
}
defer os.RemoveAll(tmpDir)
doneCh := make(chan struct{})
shutdownCh := make(chan struct{})
defer func() {
close(shutdownCh)
<-doneCh
}()
cmd := &Command{
ShutdownCh: shutdownCh,
Ui: new(cli.MockUi),
}
serfAddr := fmt.Sprintf(
"%s:%d",
agent.config.BindAddr,
agent.config.Ports.SerfLan)
args := []string{
"-data-dir", tmpDir,
"-node", conf2.NodeName,
"-retry-join", serfAddr,
"-retry-interval", "1s",
}
go func() {
if code := cmd.Run(args); code != 0 {
log.Printf("bad: %d", code)
}
close(doneCh)
}()
testutil.WaitForResult(func() (bool, error) {
mem := agent.LANMembers()
if len(mem) != 2 {
return false, fmt.Errorf("bad: %#v", mem)
}
return true, nil
}, func(err error) {
t.Fatalf(err.Error())
})
}
func TestRetryJoinFail(t *testing.T) {
conf := nextConfig()
tmpDir, err := ioutil.TempDir("", "consul")
if err != nil {
t.Fatalf("err: %s", err)
}
defer os.RemoveAll(tmpDir)
shutdownCh := make(chan struct{})
defer close(shutdownCh)
cmd := &Command{
ShutdownCh: shutdownCh,
Ui: new(cli.MockUi),
}
serfAddr := fmt.Sprintf("%s:%d", conf.BindAddr, conf.Ports.SerfLan)
args := []string{
"-data-dir", tmpDir,
"-retry-join", serfAddr,
"-retry-max", "1",
}
if code := cmd.Run(args); code == 0 {
t.Fatalf("bad: %d", code)
}
}

View File

@ -189,6 +189,20 @@ type Config struct {
// addresses, then the agent will error and exit. // addresses, then the agent will error and exit.
StartJoin []string `mapstructure:"start_join"` StartJoin []string `mapstructure:"start_join"`
// RetryJoin is a list of addresses to join with retry enabled.
RetryJoin []string `mapstructure:"retry_join"`
// RetryMaxAttempts specifies the maximum number of times to retry joining a
// host on startup. This is useful for cases where we know the node will be
// online eventually.
RetryMaxAttempts int `mapstructure:"retry_max"`
// RetryInterval specifies the amount of time to wait in between join
// attempts on agent start. The minimum allowed value is 1 second and
// the default is 30s.
RetryInterval time.Duration `mapstructure:"-" json:"-"`
RetryIntervalRaw string `mapstructure:"retry_interval"`
// UiDir is the directory containing the Web UI resources. // UiDir is the directory containing the Web UI resources.
// If provided, the UI endpoints will be enabled. // If provided, the UI endpoints will be enabled.
UiDir string `mapstructure:"ui_dir"` UiDir string `mapstructure:"ui_dir"`
@ -328,6 +342,7 @@ func DefaultConfig() *Config {
ACLTTL: 30 * time.Second, ACLTTL: 30 * time.Second,
ACLDownPolicy: "extend-cache", ACLDownPolicy: "extend-cache",
ACLDefaultPolicy: "allow", ACLDefaultPolicy: "allow",
RetryInterval: 30 * time.Second,
} }
} }
@ -450,6 +465,14 @@ func DecodeConfig(r io.Reader) (*Config, error) {
result.ACLTTL = dur result.ACLTTL = dur
} }
if raw := result.RetryIntervalRaw; raw != "" {
dur, err := time.ParseDuration(raw)
if err != nil {
return nil, fmt.Errorf("RetryInterval invalid: %v", err)
}
result.RetryInterval = dur
}
return &result, nil return &result, nil
} }
@ -681,6 +704,12 @@ func MergeConfig(a, b *Config) *Config {
if b.RejoinAfterLeave { if b.RejoinAfterLeave {
result.RejoinAfterLeave = true result.RejoinAfterLeave = true
} }
if b.RetryMaxAttempts != 0 {
result.RetryMaxAttempts = b.RetryMaxAttempts
}
if b.RetryInterval != 0 {
result.RetryInterval = b.RetryInterval
}
if b.DNSConfig.NodeTTL != 0 { if b.DNSConfig.NodeTTL != 0 {
result.DNSConfig.NodeTTL = b.DNSConfig.NodeTTL result.DNSConfig.NodeTTL = b.DNSConfig.NodeTTL
} }
@ -747,6 +776,11 @@ func MergeConfig(a, b *Config) *Config {
result.StartJoin = append(result.StartJoin, a.StartJoin...) result.StartJoin = append(result.StartJoin, a.StartJoin...)
result.StartJoin = append(result.StartJoin, b.StartJoin...) result.StartJoin = append(result.StartJoin, b.StartJoin...)
// Copy the retry join addresses
result.RetryJoin = make([]string, 0, len(a.RetryJoin)+len(b.RetryJoin))
result.RetryJoin = append(result.RetryJoin, a.RetryJoin...)
result.RetryJoin = append(result.RetryJoin, b.RetryJoin...)
return &result return &result
} }

View File

@ -265,6 +265,48 @@ func TestDecodeConfig(t *testing.T) {
t.Fatalf("bad: %#v", config) t.Fatalf("bad: %#v", config)
} }
// Retry join
input = `{"retry_join": ["1.1.1.1", "2.2.2.2"]}`
config, err = DecodeConfig(bytes.NewReader([]byte(input)))
if err != nil {
t.Fatalf("err: %s", err)
}
if len(config.RetryJoin) != 2 {
t.Fatalf("bad: %#v", config)
}
if config.RetryJoin[0] != "1.1.1.1" {
t.Fatalf("bad: %#v", config)
}
if config.RetryJoin[1] != "2.2.2.2" {
t.Fatalf("bad: %#v", config)
}
// Retry interval
input = `{"retry_interval": "10s"}`
config, err = DecodeConfig(bytes.NewReader([]byte(input)))
if err != nil {
t.Fatalf("err: %s", err)
}
if config.RetryIntervalRaw != "10s" {
t.Fatalf("bad: %#v", config)
}
if config.RetryInterval.String() != "10s" {
t.Fatalf("bad: %#v", config)
}
// Retry Max
input = `{"retry_max": 3}`
config, err = DecodeConfig(bytes.NewReader([]byte(input)))
if err != nil {
t.Fatalf("err: %s", err)
}
if config.RetryMaxAttempts != 3 {
t.Fatalf("bad: %#v", config)
}
// UI Dir // UI Dir
input = `{"ui_dir": "/opt/consul-ui"}` input = `{"ui_dir": "/opt/consul-ui"}`
config, err = DecodeConfig(bytes.NewReader([]byte(input))) config, err = DecodeConfig(bytes.NewReader([]byte(input)))
@ -572,6 +614,7 @@ func TestMergeConfig(t *testing.T) {
SkipLeaveOnInt: false, SkipLeaveOnInt: false,
EnableDebug: false, EnableDebug: false,
CheckUpdateIntervalRaw: "8m", CheckUpdateIntervalRaw: "8m",
RetryIntervalRaw: "10s",
} }
b := &Config{ b := &Config{
@ -623,6 +666,9 @@ func TestMergeConfig(t *testing.T) {
UiDir: "/opt/consul-ui", UiDir: "/opt/consul-ui",
EnableSyslog: true, EnableSyslog: true,
RejoinAfterLeave: true, RejoinAfterLeave: true,
RetryJoin: []string{"1.1.1.1"},
RetryIntervalRaw: "10s",
RetryInterval: 10 * time.Second,
CheckUpdateInterval: 8 * time.Minute, CheckUpdateInterval: 8 * time.Minute,
CheckUpdateIntervalRaw: "8m", CheckUpdateIntervalRaw: "8m",
ACLToken: "1234", ACLToken: "1234",

View File

@ -91,6 +91,16 @@ The options below are all specified on the command-line.
unable to join with any of the specified addresses, agent startup will unable to join with any of the specified addresses, agent startup will
fail. By default, the agent won't join any nodes when it starts up. fail. By default, the agent won't join any nodes when it starts up.
* `-retry-join` - Similar to `-join`, but allows retrying a join if the first
attempt fails. This is useful for cases where we know the address will become
available eventually.
* `-retry-interval` - Time to wait between join attempts. Defaults to 30s.
* `-retry-max` - The maximum number of join attempts to be made before exiting
with return code 1. By default, this is set to 0, which will continue to
retry the join indefinitely.
* `-log-level` - The level of logging to show after the Consul agent has * `-log-level` - The level of logging to show after the Consul agent has
started. This defaults to "info". The available log levels are "trace", started. This defaults to "info". The available log levels are "trace",
"debug", "info", "warn", "err". This is the log level that will be shown "debug", "info", "warn", "err". This is the log level that will be shown