Browse Source

agent: fix data race between consul server and local state

pull/3241/head
Frank Schroeder 8 years ago committed by Frank Schröder
parent
commit
0763788b82
  1. 82
      agent/agent.go
  2. 33
      agent/local.go
  3. 9
      agent/local_test.go

82
agent/agent.go

@ -104,7 +104,7 @@ type Agent struct {
// state stores a local representation of the node, // state stores a local representation of the node,
// services and checks. Used for anti-entropy. // services and checks. Used for anti-entropy.
state localState state *localState
// checkReapAfter maps the check ID to a timeout after which we should // checkReapAfter maps the check ID to a timeout after which we should
// reap its associated service // reap its associated service
@ -241,15 +241,27 @@ func (a *Agent) Start() error {
return fmt.Errorf("Failed to setup node ID: %v", err) return fmt.Errorf("Failed to setup node ID: %v", err)
} }
// create the local state
a.state = NewLocalState(c, a.logger)
// create the config for the rpc server/client
consulCfg, err := a.consulConfig()
if err != nil {
return err
}
// link consul client/server with the state
consulCfg.ServerUp = a.state.ConsulServerUp
// Setup either the client or the server. // Setup either the client or the server.
if c.Server { if c.Server {
server, err := a.makeServer() server, err := consul.NewServerLogger(consulCfg, a.logger)
if err != nil { if err != nil {
return err return fmt.Errorf("Failed to start Consul server: %v", err)
} }
a.delegate = server a.delegate = server
a.state.Init(c, a.logger, server) a.state.delegate = server
// Automatically register the "consul" service on server nodes // Automatically register the "consul" service on server nodes
consulService := structs.NodeService{ consulService := structs.NodeService{
@ -261,13 +273,13 @@ func (a *Agent) Start() error {
a.state.AddService(&consulService, c.GetTokenForAgent()) a.state.AddService(&consulService, c.GetTokenForAgent())
} else { } else {
client, err := a.makeClient() client, err := consul.NewClientLogger(consulCfg, a.logger)
if err != nil { if err != nil {
return err return fmt.Errorf("Failed to start Consul client: %v", err)
} }
a.delegate = client a.delegate = client
a.state.Init(c, a.logger, client) a.state.delegate = client
} }
// Load checks/services/metadata. // Load checks/services/metadata.
@ -774,9 +786,6 @@ func (a *Agent) consulConfig() (*consul.Config, error) {
base.TLSCipherSuites = a.config.TLSCipherSuites base.TLSCipherSuites = a.config.TLSCipherSuites
base.TLSPreferServerCipherSuites = a.config.TLSPreferServerCipherSuites base.TLSPreferServerCipherSuites = a.config.TLSPreferServerCipherSuites
// Setup the ServerUp callback
base.ServerUp = a.state.ConsulServerUp
// Setup the user event callback // Setup the user event callback
base.UserEventHandler = func(e serf.UserEvent) { base.UserEventHandler = func(e serf.UserEvent) {
select { select {
@ -787,6 +796,13 @@ func (a *Agent) consulConfig() (*consul.Config, error) {
// Setup the loggers // Setup the loggers
base.LogOutput = a.LogOutput base.LogOutput = a.LogOutput
if !a.config.DisableKeyringFile {
if err := a.setupKeyrings(base); err != nil {
return nil, fmt.Errorf("Failed to configure keyring: %v", err)
}
}
return base, nil return base, nil
} }
@ -897,42 +913,6 @@ func (a *Agent) resolveTmplAddrs() error {
return nil return nil
} }
// makeServer creates a new consul server.
func (a *Agent) makeServer() (*consul.Server, error) {
config, err := a.consulConfig()
if err != nil {
return nil, err
}
if !a.config.DisableKeyringFile {
if err := a.setupKeyrings(config); err != nil {
return nil, fmt.Errorf("Failed to configure keyring: %v", err)
}
}
server, err := consul.NewServerLogger(config, a.logger)
if err != nil {
return nil, fmt.Errorf("Failed to start Consul server: %v", err)
}
return server, nil
}
// makeClient creates a new consul client.
func (a *Agent) makeClient() (*consul.Client, error) {
config, err := a.consulConfig()
if err != nil {
return nil, err
}
if !a.config.DisableKeyringFile {
if err := a.setupKeyrings(config); err != nil {
return nil, fmt.Errorf("Failed to configure keyring: %v", err)
}
}
client, err := consul.NewClientLogger(config, a.logger)
if err != nil {
return nil, fmt.Errorf("Failed to start Consul client: %v", err)
}
return client, nil
}
// makeRandomID will generate a random UUID for a node. // makeRandomID will generate a random UUID for a node.
func (a *Agent) makeRandomID() (string, error) { func (a *Agent) makeRandomID() (string, error) {
id, err := uuid.GenerateUUID() id, err := uuid.GenerateUUID()
@ -1644,7 +1624,7 @@ func (a *Agent) AddCheck(check *structs.HealthCheck, chkType *structs.CheckType,
} }
ttl := &CheckTTL{ ttl := &CheckTTL{
Notify: &a.state, Notify: a.state,
CheckID: check.CheckID, CheckID: check.CheckID,
TTL: chkType.TTL, TTL: chkType.TTL,
Logger: a.logger, Logger: a.logger,
@ -1670,7 +1650,7 @@ func (a *Agent) AddCheck(check *structs.HealthCheck, chkType *structs.CheckType,
} }
http := &CheckHTTP{ http := &CheckHTTP{
Notify: &a.state, Notify: a.state,
CheckID: check.CheckID, CheckID: check.CheckID,
HTTP: chkType.HTTP, HTTP: chkType.HTTP,
Header: chkType.Header, Header: chkType.Header,
@ -1694,7 +1674,7 @@ func (a *Agent) AddCheck(check *structs.HealthCheck, chkType *structs.CheckType,
} }
tcp := &CheckTCP{ tcp := &CheckTCP{
Notify: &a.state, Notify: a.state,
CheckID: check.CheckID, CheckID: check.CheckID,
TCP: chkType.TCP, TCP: chkType.TCP,
Interval: chkType.Interval, Interval: chkType.Interval,
@ -1715,7 +1695,7 @@ func (a *Agent) AddCheck(check *structs.HealthCheck, chkType *structs.CheckType,
} }
dockerCheck := &CheckDocker{ dockerCheck := &CheckDocker{
Notify: &a.state, Notify: a.state,
CheckID: check.CheckID, CheckID: check.CheckID,
DockerContainerID: chkType.DockerContainerID, DockerContainerID: chkType.DockerContainerID,
Shell: chkType.Shell, Shell: chkType.Shell,
@ -1739,7 +1719,7 @@ func (a *Agent) AddCheck(check *structs.HealthCheck, chkType *structs.CheckType,
} }
monitor := &CheckMonitor{ monitor := &CheckMonitor{
Notify: &a.state, Notify: a.state,
CheckID: check.CheckID, CheckID: check.CheckID,
Script: chkType.Script, Script: chkType.Script,
Interval: chkType.Interval, Interval: chkType.Interval,

33
agent/local.go

@ -74,22 +74,23 @@ type localState struct {
triggerCh chan struct{} triggerCh chan struct{}
} }
// Init is used to initialize the local state // NewLocalState creates a is used to initialize the local state
func (l *localState) Init(c *Config, lg *log.Logger, d delegate) { func NewLocalState(c *Config, lg *log.Logger) *localState {
l.config = c return &localState{
l.delegate = d config: c,
l.logger = lg logger: lg,
l.services = make(map[string]*structs.NodeService) services: make(map[string]*structs.NodeService),
l.serviceStatus = make(map[string]syncStatus) serviceStatus: make(map[string]syncStatus),
l.serviceTokens = make(map[string]string) serviceTokens: make(map[string]string),
l.checks = make(map[types.CheckID]*structs.HealthCheck) checks: make(map[types.CheckID]*structs.HealthCheck),
l.checkStatus = make(map[types.CheckID]syncStatus) checkStatus: make(map[types.CheckID]syncStatus),
l.checkTokens = make(map[types.CheckID]string) checkTokens: make(map[types.CheckID]string),
l.checkCriticalTime = make(map[types.CheckID]time.Time) checkCriticalTime: make(map[types.CheckID]time.Time),
l.deferCheck = make(map[types.CheckID]*time.Timer) deferCheck: make(map[types.CheckID]*time.Timer),
l.metadata = make(map[string]string) metadata: make(map[string]string),
l.consulCh = make(chan struct{}, 1) consulCh: make(chan struct{}, 1),
l.triggerCh = make(chan struct{}, 1) triggerCh: make(chan struct{}, 1),
}
} }
// changeMade is used to trigger an anti-entropy run // changeMade is used to trigger an anti-entropy run

9
agent/local_test.go

@ -1419,8 +1419,7 @@ func TestAgent_serviceTokens(t *testing.T) {
t.Parallel() t.Parallel()
cfg := TestConfig() cfg := TestConfig()
cfg.ACLToken = "default" cfg.ACLToken = "default"
l := new(localState) l := NewLocalState(cfg, nil)
l.Init(cfg, nil, nil)
l.AddService(&structs.NodeService{ l.AddService(&structs.NodeService{
ID: "redis", ID: "redis",
@ -1448,8 +1447,7 @@ func TestAgent_checkTokens(t *testing.T) {
t.Parallel() t.Parallel()
cfg := TestConfig() cfg := TestConfig()
cfg.ACLToken = "default" cfg.ACLToken = "default"
l := new(localState) l := NewLocalState(cfg, nil)
l.Init(cfg, nil, nil)
// Returns default when no token is set // Returns default when no token is set
if token := l.CheckToken("mem"); token != "default" { if token := l.CheckToken("mem"); token != "default" {
@ -1472,8 +1470,7 @@ func TestAgent_checkTokens(t *testing.T) {
func TestAgent_checkCriticalTime(t *testing.T) { func TestAgent_checkCriticalTime(t *testing.T) {
t.Parallel() t.Parallel()
cfg := TestConfig() cfg := TestConfig()
l := new(localState) l := NewLocalState(cfg, nil)
l.Init(cfg, nil, nil)
// Add a passing check and make sure it's not critical. // Add a passing check and make sure it's not critical.
checkID := types.CheckID("redis:1") checkID := types.CheckID("redis:1")

Loading…
Cancel
Save