diff --git a/agent/agent.go b/agent/agent.go index 949892bca3..92055bb125 100644 --- a/agent/agent.go +++ b/agent/agent.go @@ -104,7 +104,7 @@ type Agent struct { // state stores a local representation of the node, // services and checks. Used for anti-entropy. - state localState + state *localState // checkReapAfter maps the check ID to a timeout after which we should // reap its associated service @@ -241,15 +241,27 @@ func (a *Agent) Start() error { return fmt.Errorf("Failed to setup node ID: %v", err) } + // create the local state + a.state = NewLocalState(c, a.logger) + + // create the config for the rpc server/client + consulCfg, err := a.consulConfig() + if err != nil { + return err + } + + // link consul client/server with the state + consulCfg.ServerUp = a.state.ConsulServerUp + // Setup either the client or the server. if c.Server { - server, err := a.makeServer() + server, err := consul.NewServerLogger(consulCfg, a.logger) if err != nil { - return err + return fmt.Errorf("Failed to start Consul server: %v", err) } a.delegate = server - a.state.Init(c, a.logger, server) + a.state.delegate = server // Automatically register the "consul" service on server nodes consulService := structs.NodeService{ @@ -261,13 +273,13 @@ func (a *Agent) Start() error { a.state.AddService(&consulService, c.GetTokenForAgent()) } else { - client, err := a.makeClient() + client, err := consul.NewClientLogger(consulCfg, a.logger) if err != nil { - return err + return fmt.Errorf("Failed to start Consul client: %v", err) } a.delegate = client - a.state.Init(c, a.logger, client) + a.state.delegate = client } // Load checks/services/metadata. @@ -774,9 +786,6 @@ func (a *Agent) consulConfig() (*consul.Config, error) { base.TLSCipherSuites = a.config.TLSCipherSuites base.TLSPreferServerCipherSuites = a.config.TLSPreferServerCipherSuites - // Setup the ServerUp callback - base.ServerUp = a.state.ConsulServerUp - // Setup the user event callback base.UserEventHandler = func(e serf.UserEvent) { select { @@ -787,6 +796,13 @@ func (a *Agent) consulConfig() (*consul.Config, error) { // Setup the loggers base.LogOutput = a.LogOutput + + if !a.config.DisableKeyringFile { + if err := a.setupKeyrings(base); err != nil { + return nil, fmt.Errorf("Failed to configure keyring: %v", err) + } + } + return base, nil } @@ -897,42 +913,6 @@ func (a *Agent) resolveTmplAddrs() error { return nil } -// makeServer creates a new consul server. -func (a *Agent) makeServer() (*consul.Server, error) { - config, err := a.consulConfig() - if err != nil { - return nil, err - } - if !a.config.DisableKeyringFile { - if err := a.setupKeyrings(config); err != nil { - return nil, fmt.Errorf("Failed to configure keyring: %v", err) - } - } - server, err := consul.NewServerLogger(config, a.logger) - if err != nil { - return nil, fmt.Errorf("Failed to start Consul server: %v", err) - } - return server, nil -} - -// makeClient creates a new consul client. -func (a *Agent) makeClient() (*consul.Client, error) { - config, err := a.consulConfig() - if err != nil { - return nil, err - } - if !a.config.DisableKeyringFile { - if err := a.setupKeyrings(config); err != nil { - return nil, fmt.Errorf("Failed to configure keyring: %v", err) - } - } - client, err := consul.NewClientLogger(config, a.logger) - if err != nil { - return nil, fmt.Errorf("Failed to start Consul client: %v", err) - } - return client, nil -} - // makeRandomID will generate a random UUID for a node. func (a *Agent) makeRandomID() (string, error) { id, err := uuid.GenerateUUID() @@ -1644,7 +1624,7 @@ func (a *Agent) AddCheck(check *structs.HealthCheck, chkType *structs.CheckType, } ttl := &CheckTTL{ - Notify: &a.state, + Notify: a.state, CheckID: check.CheckID, TTL: chkType.TTL, Logger: a.logger, @@ -1670,7 +1650,7 @@ func (a *Agent) AddCheck(check *structs.HealthCheck, chkType *structs.CheckType, } http := &CheckHTTP{ - Notify: &a.state, + Notify: a.state, CheckID: check.CheckID, HTTP: chkType.HTTP, Header: chkType.Header, @@ -1694,7 +1674,7 @@ func (a *Agent) AddCheck(check *structs.HealthCheck, chkType *structs.CheckType, } tcp := &CheckTCP{ - Notify: &a.state, + Notify: a.state, CheckID: check.CheckID, TCP: chkType.TCP, Interval: chkType.Interval, @@ -1715,7 +1695,7 @@ func (a *Agent) AddCheck(check *structs.HealthCheck, chkType *structs.CheckType, } dockerCheck := &CheckDocker{ - Notify: &a.state, + Notify: a.state, CheckID: check.CheckID, DockerContainerID: chkType.DockerContainerID, Shell: chkType.Shell, @@ -1739,7 +1719,7 @@ func (a *Agent) AddCheck(check *structs.HealthCheck, chkType *structs.CheckType, } monitor := &CheckMonitor{ - Notify: &a.state, + Notify: a.state, CheckID: check.CheckID, Script: chkType.Script, Interval: chkType.Interval, diff --git a/agent/local.go b/agent/local.go index 70c9a8ce5c..e4b8cdc359 100644 --- a/agent/local.go +++ b/agent/local.go @@ -74,22 +74,23 @@ type localState struct { triggerCh chan struct{} } -// Init is used to initialize the local state -func (l *localState) Init(c *Config, lg *log.Logger, d delegate) { - l.config = c - l.delegate = d - l.logger = lg - l.services = make(map[string]*structs.NodeService) - l.serviceStatus = make(map[string]syncStatus) - l.serviceTokens = make(map[string]string) - l.checks = make(map[types.CheckID]*structs.HealthCheck) - l.checkStatus = make(map[types.CheckID]syncStatus) - l.checkTokens = make(map[types.CheckID]string) - l.checkCriticalTime = make(map[types.CheckID]time.Time) - l.deferCheck = make(map[types.CheckID]*time.Timer) - l.metadata = make(map[string]string) - l.consulCh = make(chan struct{}, 1) - l.triggerCh = make(chan struct{}, 1) +// NewLocalState creates a is used to initialize the local state +func NewLocalState(c *Config, lg *log.Logger) *localState { + return &localState{ + config: c, + logger: lg, + services: make(map[string]*structs.NodeService), + serviceStatus: make(map[string]syncStatus), + serviceTokens: make(map[string]string), + checks: make(map[types.CheckID]*structs.HealthCheck), + checkStatus: make(map[types.CheckID]syncStatus), + checkTokens: make(map[types.CheckID]string), + checkCriticalTime: make(map[types.CheckID]time.Time), + deferCheck: make(map[types.CheckID]*time.Timer), + metadata: make(map[string]string), + consulCh: make(chan struct{}, 1), + triggerCh: make(chan struct{}, 1), + } } // changeMade is used to trigger an anti-entropy run diff --git a/agent/local_test.go b/agent/local_test.go index 57e639cf08..4c80540472 100644 --- a/agent/local_test.go +++ b/agent/local_test.go @@ -1419,8 +1419,7 @@ func TestAgent_serviceTokens(t *testing.T) { t.Parallel() cfg := TestConfig() cfg.ACLToken = "default" - l := new(localState) - l.Init(cfg, nil, nil) + l := NewLocalState(cfg, nil) l.AddService(&structs.NodeService{ ID: "redis", @@ -1448,8 +1447,7 @@ func TestAgent_checkTokens(t *testing.T) { t.Parallel() cfg := TestConfig() cfg.ACLToken = "default" - l := new(localState) - l.Init(cfg, nil, nil) + l := NewLocalState(cfg, nil) // Returns default when no token is set if token := l.CheckToken("mem"); token != "default" { @@ -1472,8 +1470,7 @@ func TestAgent_checkTokens(t *testing.T) { func TestAgent_checkCriticalTime(t *testing.T) { t.Parallel() cfg := TestConfig() - l := new(localState) - l.Init(cfg, nil, nil) + l := NewLocalState(cfg, nil) // Add a passing check and make sure it's not critical. checkID := types.CheckID("redis:1")