diff --git a/command/agent/agent.go b/command/agent/agent.go index 9fcee037ee..47979f3649 100644 --- a/command/agent/agent.go +++ b/command/agent/agent.go @@ -155,7 +155,7 @@ func Create(config *Config, logOutput io.Writer) (*Agent, error) { Port: agent.config.Ports.Server, Tags: []string{}, } - agent.state.AddService(&consulService, "") + agent.state.AddService(&consulService) } else { err = agent.setupClient() agent.state.SetIface(agent.client) @@ -520,11 +520,11 @@ func (a *Agent) ResumeSync() { } // persistService saves a service definition to a JSON file in the data dir -func (a *Agent) persistService(service *structs.NodeService, token string) error { +func (a *Agent) persistService(service *structs.NodeService) error { svcPath := filepath.Join(a.config.DataDir, servicesDir, stringHash(service.ID)) if _, err := os.Stat(svcPath); os.IsNotExist(err) { wrapped := &persistedService{ - Token: token, + Token: a.state.ServiceToken(service.ID), Service: service, } encoded, err := json.Marshal(wrapped) @@ -556,14 +556,14 @@ func (a *Agent) purgeService(serviceID string) error { } // persistCheck saves a check definition to the local agent's state directory -func (a *Agent) persistCheck(check *structs.HealthCheck, chkType *CheckType, token string) error { +func (a *Agent) persistCheck(check *structs.HealthCheck, chkType *CheckType) error { checkPath := filepath.Join(a.config.DataDir, checksDir, stringHash(check.CheckID)) if _, err := os.Stat(checkPath); !os.IsNotExist(err) { return err } // Create the persisted check - p := persistedCheck{check, chkType, token} + p := persistedCheck{check, chkType, a.state.CheckToken(check.CheckID)} encoded, err := json.Marshal(p) if err != nil { @@ -595,8 +595,7 @@ func (a *Agent) purgeCheck(checkID string) error { // AddService is used to add a service entry. // This entry is persistent and the agent will make a best effort to // ensure it is registered -func (a *Agent) AddService(service *structs.NodeService, chkTypes CheckTypes, - persist bool, token string) error { +func (a *Agent) AddService(service *structs.NodeService, chkTypes CheckTypes, persist bool) error { if service.Service == "" { return fmt.Errorf("Service name missing") } @@ -626,11 +625,11 @@ func (a *Agent) AddService(service *structs.NodeService, chkTypes CheckTypes, } // Add the service - a.state.AddService(service, token) + a.state.AddService(service) // Persist the service to a file if persist { - if err := a.persistService(service, token); err != nil { + if err := a.persistService(service); err != nil { return err } } @@ -650,7 +649,7 @@ func (a *Agent) AddService(service *structs.NodeService, chkTypes CheckTypes, ServiceID: service.ID, ServiceName: service.Service, } - if err := a.AddCheck(check, chkType, persist, token); err != nil { + if err := a.AddCheck(check, chkType, persist); err != nil { return err } } @@ -701,8 +700,7 @@ func (a *Agent) RemoveService(serviceID string, persist bool) error { // This entry is persistent and the agent will make a best effort to // ensure it is registered. The Check may include a CheckType which // is used to automatically update the check status -func (a *Agent) AddCheck(check *structs.HealthCheck, chkType *CheckType, - persist bool, token string) error { +func (a *Agent) AddCheck(check *structs.HealthCheck, chkType *CheckType, persist bool) error { if check.CheckID == "" { return fmt.Errorf("CheckID missing") } @@ -781,11 +779,11 @@ func (a *Agent) AddCheck(check *structs.HealthCheck, chkType *CheckType, } // Add to the local state for anti-entropy - a.state.AddCheck(check, token) + a.state.AddCheck(check) // Persist the check if persist { - return a.persistCheck(check, chkType, token) + return a.persistCheck(check, chkType) } return nil @@ -926,7 +924,7 @@ func (a *Agent) loadServices(conf *Config) error { for _, service := range conf.Services { ns := service.NodeService() chkTypes := service.CheckTypes() - if err := a.AddService(ns, chkTypes, false, service.Token); err != nil { + if err := a.AddService(ns, chkTypes, false); err != nil { return fmt.Errorf("Failed to register service '%s': %v", service.ID, err) } } @@ -959,6 +957,7 @@ func (a *Agent) loadServices(conf *Config) error { return err } svc := wrapped.Service + a.state.AddServiceToken(svc.ID, wrapped.Token) if _, ok := a.state.services[svc.ID]; ok { // Purge previously persisted service. This allows config to be @@ -969,7 +968,7 @@ func (a *Agent) loadServices(conf *Config) error { } else { a.logger.Printf("[DEBUG] agent: restored service definition %q from %q", svc.ID, filePath) - return a.AddService(svc, nil, false, wrapped.Token) + return a.AddService(svc, nil, false) } }) @@ -998,7 +997,7 @@ func (a *Agent) loadChecks(conf *Config) error { for _, check := range conf.Checks { health := check.HealthCheck(conf.NodeName) chkType := &check.CheckType - if err := a.AddCheck(health, chkType, false, ""); err != nil { + if err := a.AddCheck(health, chkType, false); err != nil { return fmt.Errorf("Failed to register check '%s': %v %v", check.Name, err, check) } } @@ -1042,7 +1041,7 @@ func (a *Agent) loadChecks(conf *Config) error { // services into the active pool p.Check.Status = structs.HealthCritical - if err := a.AddCheck(p.Check, p.ChkType, false, ""); err != nil { + if err := a.AddCheck(p.Check, p.ChkType, false); err != nil { // Purge the check if it is unable to be restored. a.logger.Printf("[WARN] agent: Failed to restore check %q: %s", p.Check.CheckID, err) @@ -1119,7 +1118,7 @@ func (a *Agent) EnableServiceMaintenance(serviceID, reason string) error { ServiceName: service.Service, Status: structs.HealthCritical, } - a.AddCheck(check, nil, true, "") + a.AddCheck(check, nil, true) a.logger.Printf("[INFO] agent: Service %q entered maintenance mode", serviceID) return nil @@ -1165,7 +1164,7 @@ func (a *Agent) EnableNodeMaintenance(reason string) { Notes: reason, Status: structs.HealthCritical, } - a.AddCheck(check, nil, true, "") + a.AddCheck(check, nil, true) a.logger.Printf("[INFO] agent: Node entered maintenance mode") } diff --git a/command/agent/agent_endpoint.go b/command/agent/agent_endpoint.go index 18a8556205..d314e7d4cb 100644 --- a/command/agent/agent_endpoint.go +++ b/command/agent/agent_endpoint.go @@ -100,9 +100,10 @@ func (s *HTTPServer) AgentRegisterCheck(resp http.ResponseWriter, req *http.Requ // Get the provided token, if any var token string s.parseToken(req, &token) + s.agent.state.AddCheckToken(health.CheckID, token) // Add the check - if err := s.agent.AddCheck(health, chkType, true, token); err != nil { + if err := s.agent.AddCheck(health, chkType, true); err != nil { return nil, err } s.syncChanges() @@ -206,9 +207,10 @@ func (s *HTTPServer) AgentRegisterService(resp http.ResponseWriter, req *http.Re // Get the provided token, if any var token string s.parseToken(req, &token) + s.agent.state.AddServiceToken(ns.ID, token) // Add the check - if err := s.agent.AddService(ns, chkTypes, true, token); err != nil { + if err := s.agent.AddService(ns, chkTypes, true); err != nil { return nil, err } s.syncChanges() diff --git a/command/agent/agent_test.go b/command/agent/agent_test.go index 33db4f6e2b..af138bb7ce 100644 --- a/command/agent/agent_test.go +++ b/command/agent/agent_test.go @@ -684,7 +684,7 @@ func TestAgent_PersistCheck(t *testing.T) { t.Fatalf("err: %s", err) } - p := persistedCheck{check, chkType} + p := persistedCheck{check, chkType, ""} expected, err := json.Marshal(p) if err != nil { t.Fatalf("err: %s", err) diff --git a/command/agent/local.go b/command/agent/local.go index d8790b30b2..4211d72c9f 100644 --- a/command/agent/local.go +++ b/command/agent/local.go @@ -36,7 +36,7 @@ type localState struct { // element due to a go bug. paused int32 - sync.Mutex + sync.RWMutex logger *log.Logger // Config is the agent config @@ -123,10 +123,30 @@ func (l *localState) isPaused() bool { return atomic.LoadInt32(&l.paused) == 1 } +// AddServiceToken configures the provided token for the service ID. +// The token will be used to perform service registration operations. +func (l *localState) AddServiceToken(id, token string) { + l.Lock() + defer l.Unlock() + l.serviceTokens[id] = token +} + +// ServiceToken returns the configured ACL token for the given +// service ID. If none is present, the agent's token is returned. +func (l *localState) ServiceToken(id string) string { + l.RLock() + defer l.RUnlock() + token := l.serviceTokens[id] + if token == "" { + token = l.config.ACLToken + } + return token +} + // AddService is used to add a service entry to the local state. // This entry is persistent and the agent will make a best effort to // ensure it is registered -func (l *localState) AddService(service *structs.NodeService, token string) { +func (l *localState) AddService(service *structs.NodeService) { // Assign the ID if none given if service.ID == "" && service.Service != "" { service.ID = service.Service @@ -137,7 +157,6 @@ func (l *localState) AddService(service *structs.NodeService, token string) { l.services[service.ID] = service l.serviceStatus[service.ID] = syncStatus{} - l.serviceTokens[service.ID] = token l.changeMade() } @@ -156,8 +175,8 @@ func (l *localState) RemoveService(serviceID string) { // agent is aware of and are being kept in sync with the server func (l *localState) Services() map[string]*structs.NodeService { services := make(map[string]*structs.NodeService) - l.Lock() - defer l.Unlock() + l.RLock() + defer l.RUnlock() for name, serv := range l.services { services[name] = serv @@ -165,10 +184,30 @@ func (l *localState) Services() map[string]*structs.NodeService { return services } +// AddCheckToken is used to configure an ACL token for a specific +// health check. The token is used during check registration operations. +func (l *localState) AddCheckToken(id, token string) { + l.Lock() + defer l.Unlock() + l.checkTokens[id] = token +} + +// CheckToken is used to return the configured health check token, or +// if none is configured, the default agent ACL token. +func (l *localState) CheckToken(id string) string { + l.RLock() + defer l.RUnlock() + token := l.checkTokens[id] + if token == "" { + token = l.config.ACLToken + } + return token +} + // AddCheck is used to add a health check to the local state. // This entry is persistent and the agent will make a best effort to // ensure it is registered -func (l *localState) AddCheck(check *structs.HealthCheck, token string) { +func (l *localState) AddCheck(check *structs.HealthCheck) { // Set the node name check.Node = l.config.NodeName @@ -177,7 +216,6 @@ func (l *localState) AddCheck(check *structs.HealthCheck, token string) { l.checks[check.CheckID] = check l.checkStatus[check.CheckID] = syncStatus{} - l.checkTokens[check.CheckID] = token l.changeMade() } @@ -239,8 +277,8 @@ func (l *localState) UpdateCheck(checkID, status, output string) { // agent is aware of and are being kept in sync with the server func (l *localState) Checks() map[string]*structs.HealthCheck { checks := make(map[string]*structs.HealthCheck) - l.Lock() - defer l.Unlock() + l.RLock() + defer l.RUnlock() for name, check := range l.checks { checks[name] = check @@ -442,16 +480,11 @@ func (l *localState) deleteService(id string) error { return fmt.Errorf("ServiceID missing") } - token := l.serviceTokens[id] - if token == "" { - token = l.config.ACLToken - } - req := structs.DeregisterRequest{ Datacenter: l.config.Datacenter, Node: l.config.NodeName, ServiceID: id, - WriteRequest: structs.WriteRequest{Token: token}, + WriteRequest: structs.WriteRequest{Token: l.ServiceToken(id)}, } var out struct{} err := l.iface.RPC("Catalog.Deregister", &req, &out) @@ -468,16 +501,11 @@ func (l *localState) deleteCheck(id string) error { return fmt.Errorf("CheckID missing") } - token := l.checkTokens[id] - if token == "" { - token = l.config.ACLToken - } - req := structs.DeregisterRequest{ Datacenter: l.config.Datacenter, Node: l.config.NodeName, CheckID: id, - WriteRequest: structs.WriteRequest{Token: token}, + WriteRequest: structs.WriteRequest{Token: l.CheckToken(id)}, } var out struct{} err := l.iface.RPC("Catalog.Deregister", &req, &out) @@ -490,17 +518,12 @@ func (l *localState) deleteCheck(id string) error { // syncService is used to sync a service to the server func (l *localState) syncService(id string) error { - token := l.serviceTokens[id] - if token == "" { - token = l.config.ACLToken - } - req := structs.RegisterRequest{ Datacenter: l.config.Datacenter, Node: l.config.NodeName, Address: l.config.AdvertiseAddr, Service: l.services[id], - WriteRequest: structs.WriteRequest{Token: token}, + WriteRequest: structs.WriteRequest{Token: l.ServiceToken(id)}, } // If the service has associated checks that are out of sync, @@ -552,18 +575,13 @@ func (l *localState) syncCheck(id string) error { } } - token := l.checkTokens[id] - if token == "" { - token = l.config.ACLToken - } - req := structs.RegisterRequest{ Datacenter: l.config.Datacenter, Node: l.config.NodeName, Address: l.config.AdvertiseAddr, Service: service, Check: l.checks[id], - WriteRequest: structs.WriteRequest{Token: token}, + WriteRequest: structs.WriteRequest{Token: l.CheckToken(id)}, } var out struct{} err := l.iface.RPC("Catalog.Register", &req, &out)